From e724dcd0885380fa016a1a9373f4fe05af171d70 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 2 Nov 2018 15:11:42 +0200 Subject: [PATCH 1/2] Added shutdown of all rule chains on tenant deletion --- .../server/actors/app/AppActor.java | 18 +++++++++++++++--- .../actors/ruleChain/RuleChainActor.java | 1 + .../RuleChainActorMessageProcessor.java | 10 ++++++---- .../server/actors/tenant/TenantActor.java | 6 ++++++ .../server/controller/TenantController.java | 3 +++ 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 6df37d6731..915704eedd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -34,9 +34,11 @@ import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.rulechain.SystemRuleChainManager; import org.thingsboard.server.actors.tenant.TenantActor; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; @@ -129,7 +131,7 @@ public class AppActor extends RuleChainManagerActor { private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { if (SYSTEM_TENANT.equals(msg.getTenantId())) { - log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet", SYSTEM_TENANT); + log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet: {}", SYSTEM_TENANT, msg); } else { getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); } @@ -142,11 +144,21 @@ public class AppActor extends RuleChainManagerActor { } private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { - ActorRef target; + ActorRef target = null; if (SYSTEM_TENANT.equals(msg.getTenantId())) { target = getEntityActorRef(msg.getEntityId()); } else { - target = getOrCreateTenantActor(msg.getTenantId()); + if (msg.getEntityId().getEntityType() == EntityType.TENANT + && msg.getEvent() == ComponentLifecycleEvent.DELETED) { + log.debug("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); + ActorRef tenantActor = tenantActors.remove(new TenantId(msg.getEntityId().getId())); + if (tenantActor != null) { + log.debug("[{}] Deleting tenant actor: {}", msg.getTenantId(), tenantActor); + context().stop(tenantActor); + } + } else { + target = getOrCreateTenantActor(msg.getTenantId()); + } } if (target != null) { target.tell(msg, ActorRef.noSender()); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java index ed765e04f8..be393204ed 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors.ruleChain; +import akka.actor.ActorInitializationException; import akka.actor.OneForOneStrategy; import akka.actor.SupervisorStrategy; import org.thingsboard.server.actors.ActorSystemContext; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index e4e82c55e5..5c6c676177 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -96,7 +96,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor Date: Fri, 2 Nov 2018 15:12:30 +0200 Subject: [PATCH 2/2] Added session registration #1212 --- .../server/transport/mqtt/session/GatewaySessionHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index ac33ba677e..37fb84b3e6 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -119,10 +119,10 @@ public class GatewaySessionHandler { GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); } future.set(devices.get(deviceName)); }