Don't init unneeded actors and consumers for dedicated Rule Engine
This commit is contained in:
		
							parent
							
								
									6751820e0a
								
							
						
					
					
						commit
						b8a7c6a3cd
					
				@ -249,6 +249,7 @@ public class ActorSystemContext {
 | 
			
		||||
    private RuleNodeStateService ruleNodeStateService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    @Getter
 | 
			
		||||
    private PartitionService partitionService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
 | 
			
		||||
@ -46,8 +46,8 @@ import org.thingsboard.server.dao.tenant.TenantService;
 | 
			
		||||
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
 | 
			
		||||
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class AppActor extends ContextAwareActor {
 | 
			
		||||
@ -124,14 +124,13 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
        try {
 | 
			
		||||
            if (systemContext.isTenantComponentsInitEnabled()) {
 | 
			
		||||
                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
 | 
			
		||||
                Set<UUID> assignedProfiles = systemContext.getServiceInfoProvider().getAssignedTenantProfiles();
 | 
			
		||||
                for (Tenant tenant : tenantIterator) {
 | 
			
		||||
                    if (!assignedProfiles.isEmpty() && !assignedProfiles.contains(tenant.getTenantProfileId().getId())) {
 | 
			
		||||
                        continue;
 | 
			
		||||
                    }
 | 
			
		||||
                    log.debug("[{}] Creating tenant actor", tenant.getId());
 | 
			
		||||
                    getOrCreateTenantActor(tenant.getId());
 | 
			
		||||
                    log.debug("[{}] Tenant actor created.", tenant.getId());
 | 
			
		||||
                    getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> {
 | 
			
		||||
                        log.debug("[{}] Tenant actor created.", tenant.getId());
 | 
			
		||||
                    }, () -> {
 | 
			
		||||
                        log.debug("[{}] Skipped actor creation", tenant.getId());
 | 
			
		||||
                    });
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            log.info("Main system actor started.");
 | 
			
		||||
@ -145,7 +144,9 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
            msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
 | 
			
		||||
        } else {
 | 
			
		||||
            if (!deletedTenants.contains(msg.getTenantId())) {
 | 
			
		||||
                getOrCreateTenantActor(msg.getTenantId()).tell(msg);
 | 
			
		||||
                getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> {
 | 
			
		||||
                    actor.tell(msg);
 | 
			
		||||
                }, () -> msg.getMsg().getCallback().onSuccess());
 | 
			
		||||
            } else {
 | 
			
		||||
                msg.getMsg().getCallback().onSuccess();
 | 
			
		||||
            }
 | 
			
		||||
@ -165,12 +166,13 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
                    log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
 | 
			
		||||
                    deletedTenants.add(tenantId);
 | 
			
		||||
                    ctx.stop(new TbEntityActorId(tenantId));
 | 
			
		||||
                } else {
 | 
			
		||||
                    target = getOrCreateTenantActor(msg.getTenantId());
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                target = getOrCreateTenantActor(msg.getTenantId());
 | 
			
		||||
            }
 | 
			
		||||
            target = getOrCreateTenantActor(msg.getTenantId()).orElseGet(() -> {
 | 
			
		||||
                log.debug("Ignoring component lifecycle msg for tenant {} because it is not managed by this service", msg.getTenantId());
 | 
			
		||||
                return null;
 | 
			
		||||
            });
 | 
			
		||||
        }
 | 
			
		||||
        if (target != null) {
 | 
			
		||||
            target.tellWithHighPriority(msg);
 | 
			
		||||
@ -181,12 +183,13 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
 | 
			
		||||
    private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
 | 
			
		||||
        if (!deletedTenants.contains(msg.getTenantId())) {
 | 
			
		||||
            TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
 | 
			
		||||
            if (priority) {
 | 
			
		||||
                tenantActor.tellWithHighPriority(msg);
 | 
			
		||||
            } else {
 | 
			
		||||
                tenantActor.tell(msg);
 | 
			
		||||
            }
 | 
			
		||||
            getOrCreateTenantActor(msg.getTenantId()).ifPresent(tenantActor -> {
 | 
			
		||||
                if (priority) {
 | 
			
		||||
                    tenantActor.tellWithHighPriority(msg);
 | 
			
		||||
                } else {
 | 
			
		||||
                    tenantActor.tell(msg);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
        } else {
 | 
			
		||||
            if (msg instanceof TransportToDeviceActorMsgWrapper) {
 | 
			
		||||
                ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
 | 
			
		||||
@ -194,10 +197,15 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
 | 
			
		||||
        return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
 | 
			
		||||
                () -> DefaultActorService.TENANT_DISPATCHER_NAME,
 | 
			
		||||
                () -> new TenantActor.ActorCreator(systemContext, tenantId));
 | 
			
		||||
    private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
 | 
			
		||||
        if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) ||
 | 
			
		||||
                systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
 | 
			
		||||
            return Optional.of(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
 | 
			
		||||
                    () -> DefaultActorService.TENANT_DISPATCHER_NAME,
 | 
			
		||||
                    () -> new TenantActor.ActorCreator(systemContext, tenantId)));
 | 
			
		||||
        } else {
 | 
			
		||||
            return Optional.empty();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
 | 
			
		||||
@ -205,7 +213,7 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
        if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) {
 | 
			
		||||
            log.warn("Message has system tenant id: {}", msg);
 | 
			
		||||
        } else {
 | 
			
		||||
            target = getOrCreateTenantActor(msg.getTenantId());
 | 
			
		||||
            target = getOrCreateTenantActor(msg.getTenantId()).orElse(null);
 | 
			
		||||
        }
 | 
			
		||||
        if (target != null) {
 | 
			
		||||
            target.tellWithHighPriority(msg);
 | 
			
		||||
 | 
			
		||||
@ -32,7 +32,6 @@ import org.thingsboard.server.actors.service.DefaultActorService;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageState;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.Tenant;
 | 
			
		||||
import org.thingsboard.server.common.data.TenantProfile;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.Edge;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
@ -83,21 +82,21 @@ public class TenantActor extends RuleChainManagerActor {
 | 
			
		||||
                cantFindTenant = true;
 | 
			
		||||
                log.info("[{}] Started tenant actor for missing tenant.", tenantId);
 | 
			
		||||
            } else {
 | 
			
		||||
                TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId());
 | 
			
		||||
 | 
			
		||||
                isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
 | 
			
		||||
                isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
 | 
			
		||||
                if (isRuleEngine) {
 | 
			
		||||
                    try {
 | 
			
		||||
                        if (getApiUsageState().isReExecEnabled()) {
 | 
			
		||||
                            log.debug("[{}] Going to init rule chains", tenantId);
 | 
			
		||||
                            initRuleChains();
 | 
			
		||||
                        } else {
 | 
			
		||||
                            log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
 | 
			
		||||
                    if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
 | 
			
		||||
                        try {
 | 
			
		||||
                            if (getApiUsageState().isReExecEnabled()) {
 | 
			
		||||
                                log.debug("[{}] Going to init rule chains", tenantId);
 | 
			
		||||
                                initRuleChains();
 | 
			
		||||
                            } else {
 | 
			
		||||
                                log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
 | 
			
		||||
                            }
 | 
			
		||||
                        } catch (Exception e) {
 | 
			
		||||
                            log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
 | 
			
		||||
                            cantFindTenant = true;
 | 
			
		||||
                        }
 | 
			
		||||
                    } catch (Exception e) {
 | 
			
		||||
                        log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
 | 
			
		||||
                        cantFindTenant = true;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                log.debug("[{}] Tenant actor started.", tenantId);
 | 
			
		||||
 | 
			
		||||
@ -156,7 +156,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
        super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer");
 | 
			
		||||
        List<Queue> queues = queueService.findAllQueues();
 | 
			
		||||
        for (Queue configuration : queues) {
 | 
			
		||||
            initConsumer(configuration); // TODO: if this Rule Engine is assigned specific profile, don't init other consumers and properly handle queue update events
 | 
			
		||||
            if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
 | 
			
		||||
                initConsumer(configuration);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -183,7 +185,12 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
        if (event.getServiceType().equals(getServiceType())) {
 | 
			
		||||
            String serviceQueue = event.getQueueKey().getQueueName();
 | 
			
		||||
            log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions());
 | 
			
		||||
            if (!consumerConfigurations.get(event.getQueueKey()).isConsumerPerPartition()) {
 | 
			
		||||
            Queue configuration = consumerConfigurations.get(event.getQueueKey());
 | 
			
		||||
            if (configuration == null) {
 | 
			
		||||
                log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            if (!configuration.isConsumerPerPartition()) {
 | 
			
		||||
                consumers.get(event.getQueueKey()).subscribe(event.getPartitions());
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions());
 | 
			
		||||
@ -425,32 +432,34 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
 | 
			
		||||
    private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
 | 
			
		||||
        log.info("Received queue update msg: [{}]", queueUpdateMsg);
 | 
			
		||||
        String queueName = queueUpdateMsg.getQueueName();
 | 
			
		||||
        TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
 | 
			
		||||
        QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
 | 
			
		||||
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
 | 
			
		||||
        Queue queue = queueService.findQueueById(tenantId, queueId);
 | 
			
		||||
        Queue oldQueue = consumerConfigurations.remove(queueKey);
 | 
			
		||||
        if (oldQueue != null) {
 | 
			
		||||
            if (oldQueue.isConsumerPerPartition()) {
 | 
			
		||||
                TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey);
 | 
			
		||||
                ReentrantLock lock = consumerPerPartition.getLock();
 | 
			
		||||
                try {
 | 
			
		||||
                    lock.lock();
 | 
			
		||||
                    consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe);
 | 
			
		||||
                } finally {
 | 
			
		||||
                    lock.unlock();
 | 
			
		||||
        if (partitionService.isManagedByCurrentService(tenantId)) {
 | 
			
		||||
            QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
 | 
			
		||||
            String queueName = queueUpdateMsg.getQueueName();
 | 
			
		||||
            QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
 | 
			
		||||
            Queue queue = queueService.findQueueById(tenantId, queueId);
 | 
			
		||||
            Queue oldQueue = consumerConfigurations.remove(queueKey);
 | 
			
		||||
            if (oldQueue != null) {
 | 
			
		||||
                if (oldQueue.isConsumerPerPartition()) {
 | 
			
		||||
                    TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey);
 | 
			
		||||
                    ReentrantLock lock = consumerPerPartition.getLock();
 | 
			
		||||
                    try {
 | 
			
		||||
                        lock.lock();
 | 
			
		||||
                        consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe);
 | 
			
		||||
                    } finally {
 | 
			
		||||
                        lock.unlock();
 | 
			
		||||
                    }
 | 
			
		||||
                } else {
 | 
			
		||||
                    TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey);
 | 
			
		||||
                    consumer.unsubscribe();
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey);
 | 
			
		||||
                consumer.unsubscribe();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        initConsumer(queue);
 | 
			
		||||
            initConsumer(queue);
 | 
			
		||||
 | 
			
		||||
        if (!queue.isConsumerPerPartition()) {
 | 
			
		||||
            launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName);
 | 
			
		||||
            if (!queue.isConsumerPerPartition()) {
 | 
			
		||||
                launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        partitionService.updateQueue(queueUpdateMsg);
 | 
			
		||||
 | 
			
		||||
@ -46,6 +46,7 @@ import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Random;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
@ -274,4 +275,25 @@ public class HashPartitionServiceTest {
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testIsManagedByCurrentServiceCheck() {
 | 
			
		||||
        TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID());
 | 
			
		||||
        when(discoveryService.getAssignedTenantProfiles()).thenReturn(Set.of(isolatedProfileId.getId())); // dedicated server
 | 
			
		||||
        TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID());
 | 
			
		||||
 | 
			
		||||
        TenantId isolatedTenantId = new TenantId(UUID.randomUUID());
 | 
			
		||||
        when(routingInfoService.getRoutingInfo(eq(isolatedTenantId))).thenReturn(new TenantRoutingInfo(isolatedTenantId, isolatedProfileId, true));
 | 
			
		||||
        TenantId regularTenantId = new TenantId(UUID.randomUUID());
 | 
			
		||||
        when(routingInfoService.getRoutingInfo(eq(regularTenantId))).thenReturn(new TenantRoutingInfo(regularTenantId, regularProfileId, false));
 | 
			
		||||
 | 
			
		||||
        assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue();
 | 
			
		||||
        assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        when(discoveryService.getAssignedTenantProfiles()).thenReturn(Collections.emptySet()); // common server
 | 
			
		||||
 | 
			
		||||
        assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue();
 | 
			
		||||
        assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -85,6 +85,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
 | 
			
		||||
        } else {
 | 
			
		||||
            serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
 | 
			
		||||
        }
 | 
			
		||||
        if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) {
 | 
			
		||||
            assignedTenantProfiles = Collections.emptySet();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
       generateNewServiceInfoWithCurrentSystemInfo();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -186,6 +186,21 @@ public class HashPartitionService implements PartitionService {
 | 
			
		||||
        removeTenant(tenantId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public boolean isManagedByCurrentService(TenantId tenantId) {
 | 
			
		||||
        Set<UUID> assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles();
 | 
			
		||||
        if (assignedTenantProfiles.isEmpty()) {
 | 
			
		||||
            // TODO: refactor this for common servers
 | 
			
		||||
            return true;
 | 
			
		||||
        } else {
 | 
			
		||||
            if (tenantId.isSysTenantId()) {
 | 
			
		||||
                return false;
 | 
			
		||||
            }
 | 
			
		||||
            TenantProfileId profileId = tenantRoutingInfoService.getRoutingInfo(tenantId).getProfileId();
 | 
			
		||||
            return assignedTenantProfiles.contains(profileId.getId());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
 | 
			
		||||
        TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId);
 | 
			
		||||
 | 
			
		||||
@ -64,4 +64,7 @@ public interface PartitionService {
 | 
			
		||||
    void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg);
 | 
			
		||||
 | 
			
		||||
    void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg);
 | 
			
		||||
 | 
			
		||||
    boolean isManagedByCurrentService(TenantId tenantId);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user