diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 4ab54c0ad8..93cf5bd8c7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -17,6 +17,7 @@ package org.thingsboard.server.actors.app; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.ProcessFailureStrategy; import org.thingsboard.server.actors.TbActor; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorException; @@ -88,7 +89,7 @@ public class AppActor extends ContextAwareActor { case APP_INIT_MSG: break; case PARTITION_CHANGE_MSG: - ctx.broadcastToChildren(msg); + ctx.broadcastToChildren(msg, true); break; case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); @@ -219,6 +220,12 @@ public class AppActor extends ContextAwareActor { } } + @Override + public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) { + log.error("Failed to process msg: {}", msg, t); + return doProcessFailure(t); + } + public static class ActorCreator extends ContextBasedCreator { public ActorCreator(ActorSystemContext context) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java index 6f920bc9ab..f2213c02d0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java @@ -59,7 +59,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { } protected void initRuleChains() { - ruleChainsInitialized = true; + log.debug("[{}] Initializing rule chains", tenantId); for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) { RuleChainId ruleChainId = ruleChain.getId(); log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId()); @@ -67,9 +67,11 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { visit(ruleChain, actorRef); log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId()); } + ruleChainsInitialized = true; } protected void destroyRuleChains() { + log.debug("[{}] Destroying rule chains", tenantId); for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) { ctx.stop(new TbEntityActorId(ruleChain.getId())); } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java index 326411ec91..721e898708 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java @@ -47,8 +47,8 @@ public abstract class ContextAwareActor extends AbstractTbActor { protected abstract boolean doProcess(TbActorMsg msg); @Override - public ProcessFailureStrategy onProcessFailure(Throwable t) { - log.debug("[{}] Processing failure: ", getActorRef().getActorId(), t); + public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) { + log.debug("[{}] Processing failure for msg {}", getActorRef().getActorId(), msg, t); return doProcessFailure(t); } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index f766f8813c..dab5d05fc3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -122,7 +122,7 @@ public class DefaultActorService extends TbApplicationEventListener true, msg); + broadcastToChildren(parent, msg, false); + } + + @Override + public void broadcastToChildren(TbActorId parent, TbActorMsg msg, boolean highPriority) { + broadcastToChildren(parent, id -> true, msg, highPriority); } @Override public void broadcastToChildren(TbActorId parent, Predicate childFilter, TbActorMsg msg) { + broadcastToChildren(parent, childFilter, msg, false); + } + + private void broadcastToChildren(TbActorId parent, Predicate childFilter, TbActorMsg msg, boolean highPriority) { Set children = parentChildMap.get(parent); if (children != null) { - children.stream().filter(childFilter).forEach(id -> tell(id, msg)); + children.stream().filter(childFilter).forEach(id -> { + try { + tell(id, msg, highPriority); + } catch (TbActorNotRegisteredException e) { + log.warn("Actor is missing for {}", id); + } + }); } } @@ -190,6 +205,8 @@ public class DefaultTbActorSystem implements TbActorSystem { stop(child); } } + parentChildMap.values().forEach(parentChildren -> parentChildren.remove(actorId)); + TbActorMailbox mailbox = actors.remove(actorId); if (mailbox != null) { mailbox.destroy(null); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java index 3c0bfea49c..01ae8898ee 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java @@ -34,7 +34,7 @@ public interface TbActor { return InitFailureStrategy.retryWithDelay(5000L * attempt); } - default ProcessFailureStrategy onProcessFailure(Throwable t) { + default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) { if (t instanceof Error) { return ProcessFailureStrategy.stop(); } else { diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java index 3b71daf8c2..ae3a072692 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java @@ -36,6 +36,8 @@ public interface TbActorCtx extends TbActorRef { void broadcastToChildren(TbActorMsg msg); + void broadcastToChildren(TbActorMsg msg, boolean highPriority); + void broadcastToChildrenByType(TbActorMsg msg, EntityType entityType); void broadcastToChildren(TbActorMsg msg, Predicate childFilter); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index 802fc5dccc..776d5657ce 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -160,7 +160,7 @@ public final class TbActorMailbox implements TbActorCtx { destroy(updateException.getCause()); } catch (Throwable t) { log.debug("[{}] Failed to process message: {}", selfId, msg, t); - ProcessFailureStrategy strategy = actor.onProcessFailure(t); + ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t); if (strategy.isStop()) { system.stop(selfId); } @@ -190,7 +190,12 @@ public final class TbActorMailbox implements TbActorCtx { @Override public void broadcastToChildren(TbActorMsg msg) { - system.broadcastToChildren(selfId, msg); + broadcastToChildren(msg, false); + } + + @Override + public void broadcastToChildren(TbActorMsg msg, boolean highPriority) { + system.broadcastToChildren(selfId, msg, highPriority); } @Override diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java index 154f9d8a88..efbfdae832 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java @@ -48,6 +48,8 @@ public interface TbActorSystem { void broadcastToChildren(TbActorId parent, TbActorMsg msg); + void broadcastToChildren(TbActorId parent, TbActorMsg msg, boolean highPriority); + void broadcastToChildren(TbActorId parent, Predicate childFilter, TbActorMsg msg); List filterChildren(TbActorId parent, Predicate childFilter);