From b8a7c6a3cd758508c4cdcb7b811a839f5fe080d9 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 15 Aug 2023 16:40:26 +0300 Subject: [PATCH] Don't init unneeded actors and consumers for dedicated Rule Engine --- .../server/actors/ActorSystemContext.java | 1 + .../server/actors/app/AppActor.java | 54 ++++++++++-------- .../server/actors/tenant/TenantActor.java | 23 ++++---- .../DefaultTbRuleEngineConsumerService.java | 55 +++++++++++-------- .../discovery/HashPartitionServiceTest.java | 22 ++++++++ .../DefaultTbServiceInfoProvider.java | 3 + .../queue/discovery/HashPartitionService.java | 15 +++++ .../queue/discovery/PartitionService.java | 3 + 8 files changed, 118 insertions(+), 58 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 7e5dfed5f0..1abfe79a76 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -249,6 +249,7 @@ public class ActorSystemContext { private RuleNodeStateService ruleNodeStateService; @Autowired + @Getter private PartitionService partitionService; @Autowired diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index e8a56fab16..52abeca2c0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -46,8 +46,8 @@ import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import java.util.HashSet; +import java.util.Optional; import java.util.Set; -import java.util.UUID; @Slf4j public class AppActor extends ContextAwareActor { @@ -124,14 +124,13 @@ public class AppActor extends ContextAwareActor { try { if (systemContext.isTenantComponentsInitEnabled()) { PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT); - Set assignedProfiles = systemContext.getServiceInfoProvider().getAssignedTenantProfiles(); for (Tenant tenant : tenantIterator) { - if (!assignedProfiles.isEmpty() && !assignedProfiles.contains(tenant.getTenantProfileId().getId())) { - continue; - } log.debug("[{}] Creating tenant actor", tenant.getId()); - getOrCreateTenantActor(tenant.getId()); - log.debug("[{}] Tenant actor created.", tenant.getId()); + getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> { + log.debug("[{}] Tenant actor created.", tenant.getId()); + }, () -> { + log.debug("[{}] Skipped actor creation", tenant.getId()); + }); } } log.info("Main system actor started."); @@ -145,7 +144,9 @@ public class AppActor extends ContextAwareActor { msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); } else { if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).tell(msg); + getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> { + actor.tell(msg); + }, () -> msg.getMsg().getCallback().onSuccess()); } else { msg.getMsg().getCallback().onSuccess(); } @@ -165,12 +166,13 @@ public class AppActor extends ContextAwareActor { log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); deletedTenants.add(tenantId); ctx.stop(new TbEntityActorId(tenantId)); - } else { - target = getOrCreateTenantActor(msg.getTenantId()); + return; } - } else { - target = getOrCreateTenantActor(msg.getTenantId()); } + target = getOrCreateTenantActor(msg.getTenantId()).orElseGet(() -> { + log.debug("Ignoring component lifecycle msg for tenant {} because it is not managed by this service", msg.getTenantId()); + return null; + }); } if (target != null) { target.tellWithHighPriority(msg); @@ -181,12 +183,13 @@ public class AppActor extends ContextAwareActor { private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { if (!deletedTenants.contains(msg.getTenantId())) { - TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId()); - if (priority) { - tenantActor.tellWithHighPriority(msg); - } else { - tenantActor.tell(msg); - } + getOrCreateTenantActor(msg.getTenantId()).ifPresent(tenantActor -> { + if (priority) { + tenantActor.tellWithHighPriority(msg); + } else { + tenantActor.tell(msg); + } + }); } else { if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); @@ -194,10 +197,15 @@ public class AppActor extends ContextAwareActor { } } - private TbActorRef getOrCreateTenantActor(TenantId tenantId) { - return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), - () -> DefaultActorService.TENANT_DISPATCHER_NAME, - () -> new TenantActor.ActorCreator(systemContext, tenantId)); + private Optional getOrCreateTenantActor(TenantId tenantId) { + if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) || + systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + return Optional.of(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), + () -> DefaultActorService.TENANT_DISPATCHER_NAME, + () -> new TenantActor.ActorCreator(systemContext, tenantId))); + } else { + return Optional.empty(); + } } private void onToEdgeSessionMsg(EdgeSessionMsg msg) { @@ -205,7 +213,7 @@ public class AppActor extends ContextAwareActor { if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) { log.warn("Message has system tenant id: {}", msg); } else { - target = getOrCreateTenantActor(msg.getTenantId()); + target = getOrCreateTenantActor(msg.getTenantId()).orElse(null); } if (target != null) { target.tellWithHighPriority(msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 84b7757846..6cc249c87c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -32,7 +32,6 @@ import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -83,21 +82,21 @@ public class TenantActor extends RuleChainManagerActor { cantFindTenant = true; log.info("[{}] Started tenant actor for missing tenant.", tenantId); } else { - TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId()); - isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); if (isRuleEngine) { - try { - if (getApiUsageState().isReExecEnabled()) { - log.debug("[{}] Going to init rule chains", tenantId); - initRuleChains(); - } else { - log.info("[{}] Skip init of the rule chains due to API limits", tenantId); + if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + try { + if (getApiUsageState().isReExecEnabled()) { + log.debug("[{}] Going to init rule chains", tenantId); + initRuleChains(); + } else { + log.info("[{}] Skip init of the rule chains due to API limits", tenantId); + } + } catch (Exception e) { + log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); + cantFindTenant = true; } - } catch (Exception e) { - log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); - cantFindTenant = true; } } log.debug("[{}] Tenant actor started.", tenantId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index a18b5c099c..19dea11665 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -156,7 +156,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer"); List queues = queueService.findAllQueues(); for (Queue configuration : queues) { - initConsumer(configuration); // TODO: if this Rule Engine is assigned specific profile, don't init other consumers and properly handle queue update events + if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { + initConsumer(configuration); + } } } @@ -183,7 +185,12 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (event.getServiceType().equals(getServiceType())) { String serviceQueue = event.getQueueKey().getQueueName(); log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions()); - if (!consumerConfigurations.get(event.getQueueKey()).isConsumerPerPartition()) { + Queue configuration = consumerConfigurations.get(event.getQueueKey()); + if (configuration == null) { + log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey()); + return; + } + if (!configuration.isConsumerPerPartition()) { consumers.get(event.getQueueKey()).subscribe(event.getPartitions()); } else { log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions()); @@ -425,32 +432,34 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { log.info("Received queue update msg: [{}]", queueUpdateMsg); - String queueName = queueUpdateMsg.getQueueName(); TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); - QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); - Queue queue = queueService.findQueueById(tenantId, queueId); - Queue oldQueue = consumerConfigurations.remove(queueKey); - if (oldQueue != null) { - if (oldQueue.isConsumerPerPartition()) { - TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey); - ReentrantLock lock = consumerPerPartition.getLock(); - try { - lock.lock(); - consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); - } finally { - lock.unlock(); + if (partitionService.isManagedByCurrentService(tenantId)) { + QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); + String queueName = queueUpdateMsg.getQueueName(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); + Queue queue = queueService.findQueueById(tenantId, queueId); + Queue oldQueue = consumerConfigurations.remove(queueKey); + if (oldQueue != null) { + if (oldQueue.isConsumerPerPartition()) { + TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey); + ReentrantLock lock = consumerPerPartition.getLock(); + try { + lock.lock(); + consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); + } finally { + lock.unlock(); + } + } else { + TbQueueConsumer> consumer = consumers.remove(queueKey); + consumer.unsubscribe(); } - } else { - TbQueueConsumer> consumer = consumers.remove(queueKey); - consumer.unsubscribe(); } - } - initConsumer(queue); + initConsumer(queue); - if (!queue.isConsumerPerPartition()) { - launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName); + if (!queue.isConsumerPerPartition()) { + launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName); + } } partitionService.updateQueue(queueUpdateMsg); diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 84024ecd1e..7bd9ec576f 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -274,4 +275,25 @@ public class HashPartitionServiceTest { }); } + @Test + public void testIsManagedByCurrentServiceCheck() { + TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID()); + when(discoveryService.getAssignedTenantProfiles()).thenReturn(Set.of(isolatedProfileId.getId())); // dedicated server + TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID()); + + TenantId isolatedTenantId = new TenantId(UUID.randomUUID()); + when(routingInfoService.getRoutingInfo(eq(isolatedTenantId))).thenReturn(new TenantRoutingInfo(isolatedTenantId, isolatedProfileId, true)); + TenantId regularTenantId = new TenantId(UUID.randomUUID()); + when(routingInfoService.getRoutingInfo(eq(regularTenantId))).thenReturn(new TenantRoutingInfo(regularTenantId, regularProfileId, false)); + + assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue(); + assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse(); + + + when(discoveryService.getAssignedTenantProfiles()).thenReturn(Collections.emptySet()); // common server + + assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue(); + assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue(); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index e9013ef345..64a8b70a1f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -85,6 +85,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { } else { serviceTypes = Collections.singletonList(ServiceType.of(serviceType)); } + if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) { + assignedTenantProfiles = Collections.emptySet(); + } generateNewServiceInfoWithCurrentSystemInfo(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 4c1894eae1..2db342d4e6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -186,6 +186,21 @@ public class HashPartitionService implements PartitionService { removeTenant(tenantId); } + @Override + public boolean isManagedByCurrentService(TenantId tenantId) { + Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); + if (assignedTenantProfiles.isEmpty()) { + // TODO: refactor this for common servers + return true; + } else { + if (tenantId.isSysTenantId()) { + return false; + } + TenantProfileId profileId = tenantRoutingInfoService.getRoutingInfo(tenantId).getProfileId(); + return assignedTenantProfiles.contains(profileId.getId()); + } + } + @Override public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) { TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index faa4d956a8..b55ba79f67 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -64,4 +64,7 @@ public interface PartitionService { void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg); void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + + boolean isManagedByCurrentService(TenantId tenantId); + }