Added shutdown of all rule chains on tenant deletion
This commit is contained in:
		
							parent
							
								
									f6b00b35db
								
							
						
					
					
						commit
						e724dcd088
					
				@ -34,9 +34,11 @@ import org.thingsboard.server.actors.service.ContextBasedCreator;
 | 
				
			|||||||
import org.thingsboard.server.actors.service.DefaultActorService;
 | 
					import org.thingsboard.server.actors.service.DefaultActorService;
 | 
				
			||||||
import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager;
 | 
					import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager;
 | 
				
			||||||
import org.thingsboard.server.actors.tenant.TenantActor;
 | 
					import org.thingsboard.server.actors.tenant.TenantActor;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.EntityType;
 | 
				
			||||||
import org.thingsboard.server.common.data.Tenant;
 | 
					import org.thingsboard.server.common.data.Tenant;
 | 
				
			||||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
					import org.thingsboard.server.common.data.id.TenantId;
 | 
				
			||||||
import org.thingsboard.server.common.data.page.PageDataIterable;
 | 
					import org.thingsboard.server.common.data.page.PageDataIterable;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
					import org.thingsboard.server.common.msg.TbActorMsg;
 | 
				
			||||||
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 | 
					import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
 | 
				
			||||||
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 | 
					import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
 | 
				
			||||||
@ -129,7 +131,7 @@ public class AppActor extends RuleChainManagerActor {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
 | 
					    private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) {
 | 
				
			||||||
        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
 | 
					        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
 | 
				
			||||||
            log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet", SYSTEM_TENANT);
 | 
					            log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet: {}", SYSTEM_TENANT, msg);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
 | 
					            getOrCreateTenantActor(msg.getTenantId()).tell(msg, self());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -142,11 +144,21 @@ public class AppActor extends RuleChainManagerActor {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
 | 
					    private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
 | 
				
			||||||
        ActorRef target;
 | 
					        ActorRef target = null;
 | 
				
			||||||
        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
 | 
					        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
 | 
				
			||||||
            target = getEntityActorRef(msg.getEntityId());
 | 
					            target = getEntityActorRef(msg.getEntityId());
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            target = getOrCreateTenantActor(msg.getTenantId());
 | 
					            if (msg.getEntityId().getEntityType() == EntityType.TENANT
 | 
				
			||||||
 | 
					                    && msg.getEvent() == ComponentLifecycleEvent.DELETED) {
 | 
				
			||||||
 | 
					                log.debug("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
 | 
				
			||||||
 | 
					                ActorRef tenantActor = tenantActors.remove(new TenantId(msg.getEntityId().getId()));
 | 
				
			||||||
 | 
					                if (tenantActor != null) {
 | 
				
			||||||
 | 
					                    log.debug("[{}] Deleting tenant actor: {}", msg.getTenantId(), tenantActor);
 | 
				
			||||||
 | 
					                    context().stop(tenantActor);
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                target = getOrCreateTenantActor(msg.getTenantId());
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (target != null) {
 | 
					        if (target != null) {
 | 
				
			||||||
            target.tell(msg, ActorRef.noSender());
 | 
					            target.tell(msg, ActorRef.noSender());
 | 
				
			||||||
 | 
				
			|||||||
@ -15,6 +15,7 @@
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
package org.thingsboard.server.actors.ruleChain;
 | 
					package org.thingsboard.server.actors.ruleChain;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import akka.actor.ActorInitializationException;
 | 
				
			||||||
import akka.actor.OneForOneStrategy;
 | 
					import akka.actor.OneForOneStrategy;
 | 
				
			||||||
import akka.actor.SupervisorStrategy;
 | 
					import akka.actor.SupervisorStrategy;
 | 
				
			||||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
					import org.thingsboard.server.actors.ActorSystemContext;
 | 
				
			||||||
 | 
				
			|||||||
@ -96,7 +96,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
				
			|||||||
            log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
 | 
					            log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
 | 
				
			||||||
            // Creating and starting the actors;
 | 
					            // Creating and starting the actors;
 | 
				
			||||||
            for (RuleNode ruleNode : ruleNodeList) {
 | 
					            for (RuleNode ruleNode : ruleNodeList) {
 | 
				
			||||||
                log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
 | 
					                log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
 | 
				
			||||||
                ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
 | 
					                ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
 | 
				
			||||||
                nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
 | 
					                nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@ -116,11 +116,11 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
				
			|||||||
        for (RuleNode ruleNode : ruleNodeList) {
 | 
					        for (RuleNode ruleNode : ruleNodeList) {
 | 
				
			||||||
            RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
 | 
					            RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
 | 
				
			||||||
            if (existing == null) {
 | 
					            if (existing == null) {
 | 
				
			||||||
                log.trace("[{}][{}] Creating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
 | 
					                log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
 | 
				
			||||||
                ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
 | 
					                ActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
 | 
				
			||||||
                nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
 | 
					                nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                log.trace("[{}][{}] Updating rule node [{}]: {}", tenantId, entityId, ruleNode.getName(), ruleNode);
 | 
					                log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
 | 
				
			||||||
                existing.setSelf(ruleNode);
 | 
					                existing.setSelf(ruleNode);
 | 
				
			||||||
                existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
 | 
					                existing.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, existing.getSelf().getId(), ComponentLifecycleEvent.UPDATED), self);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@ -184,13 +184,15 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        firstId = ruleChain.getFirstRuleNodeId();
 | 
					        firstId = ruleChain.getFirstRuleNodeId();
 | 
				
			||||||
        firstNode = nodeActors.get(ruleChain.getFirstRuleNodeId());
 | 
					        firstNode = nodeActors.get(firstId);
 | 
				
			||||||
        state = ComponentLifecycleState.ACTIVE;
 | 
					        state = ComponentLifecycleState.ACTIVE;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
 | 
					    void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg envelope) {
 | 
				
			||||||
 | 
					        log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, envelope.getTbMsg().getId(), envelope.getTbMsg());
 | 
				
			||||||
        checkActive();
 | 
					        checkActive();
 | 
				
			||||||
        if (firstNode != null) {
 | 
					        if (firstNode != null) {
 | 
				
			||||||
 | 
					            log.trace("[{}][{}] Pushing message to first rule node", entityId, firstId);
 | 
				
			||||||
            pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
 | 
					            pushMsgToNode(firstNode, enrichWithRuleChainId(envelope.getTbMsg()), "");
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.EntityType;
 | 
				
			|||||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
					import org.thingsboard.server.common.data.id.DeviceId;
 | 
				
			||||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
					import org.thingsboard.server.common.data.id.RuleChainId;
 | 
				
			||||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
					import org.thingsboard.server.common.data.id.TenantId;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 | 
				
			||||||
import org.thingsboard.server.common.data.rule.RuleChain;
 | 
					import org.thingsboard.server.common.data.rule.RuleChain;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
					import org.thingsboard.server.common.msg.TbActorMsg;
 | 
				
			||||||
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
 | 
					import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
 | 
				
			||||||
@ -75,6 +76,11 @@ public class TenantActor extends RuleChainManagerActor {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public void postStop() {
 | 
				
			||||||
 | 
					        log.info("[{}] Stopping tenant actor.", tenantId);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    protected boolean process(TbActorMsg msg) {
 | 
					    protected boolean process(TbActorMsg msg) {
 | 
				
			||||||
        switch (msg.getMsgType()) {
 | 
					        switch (msg.getMsgType()) {
 | 
				
			||||||
 | 
				
			|||||||
@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
				
			|||||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
					import org.thingsboard.server.common.data.id.TenantId;
 | 
				
			||||||
import org.thingsboard.server.common.data.page.TextPageData;
 | 
					import org.thingsboard.server.common.data.page.TextPageData;
 | 
				
			||||||
import org.thingsboard.server.common.data.page.TextPageLink;
 | 
					import org.thingsboard.server.common.data.page.TextPageLink;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
 | 
				
			||||||
import org.thingsboard.server.dao.tenant.TenantService;
 | 
					import org.thingsboard.server.dao.tenant.TenantService;
 | 
				
			||||||
import org.thingsboard.server.service.install.InstallScripts;
 | 
					import org.thingsboard.server.service.install.InstallScripts;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -84,6 +85,8 @@ public class TenantController extends BaseController {
 | 
				
			|||||||
        try {
 | 
					        try {
 | 
				
			||||||
            TenantId tenantId = new TenantId(toUUID(strTenantId));
 | 
					            TenantId tenantId = new TenantId(toUUID(strTenantId));
 | 
				
			||||||
            tenantService.deleteTenant(tenantId);
 | 
					            tenantService.deleteTenant(tenantId);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            actorService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED);
 | 
				
			||||||
        } catch (Exception e) {
 | 
					        } catch (Exception e) {
 | 
				
			||||||
            throw handleException(e);
 | 
					            throw handleException(e);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user