Fix actor msg broadcast

This commit is contained in:
ViacheslavKlimov 2024-02-23 14:00:43 +02:00
parent efca0ecc3d
commit 10c13bf5b4
10 changed files with 54 additions and 12 deletions

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.actors.app;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ProcessFailureStrategy;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
@ -88,7 +89,7 @@ public class AppActor extends ContextAwareActor {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg);
ctx.broadcastToChildren(msg, true);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
@ -219,6 +220,12 @@ public class AppActor extends ContextAwareActor {
}
}
@Override
public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
log.error("Failed to process msg: {}", msg, t);
return doProcessFailure(t);
}
public static class ActorCreator extends ContextBasedCreator {
public ActorCreator(ActorSystemContext context) {

View File

@ -59,7 +59,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
}
protected void initRuleChains() {
ruleChainsInitialized = true;
log.debug("[{}] Initializing rule chains", tenantId);
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
@ -67,9 +67,11 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
ruleChainsInitialized = true;
}
protected void destroyRuleChains() {
log.debug("[{}] Destroying rule chains", tenantId);
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
ctx.stop(new TbEntityActorId(ruleChain.getId()));
}

View File

@ -47,8 +47,8 @@ public abstract class ContextAwareActor extends AbstractTbActor {
protected abstract boolean doProcess(TbActorMsg msg);
@Override
public ProcessFailureStrategy onProcessFailure(Throwable t) {
log.debug("[{}] Processing failure: ", getActorRef().getActorId(), t);
public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
log.debug("[{}] Processing failure for msg {}", getActorRef().getActorId(), msg, t);
return doProcessFailure(t);
}

View File

@ -122,7 +122,7 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
log.info("Received partition change event.");
this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
}
@Override

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.actors.tenant;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.ProcessFailureStrategy;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
@ -183,7 +184,7 @@ public class TenantActor extends RuleChainManagerActor {
return;
}
TbMsg tbMsg = msg.getMsg();
if (getApiUsageState().isReExecEnabled() && ruleChainsInitialized) {
if (getApiUsageState().isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
@ -207,7 +208,7 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onRuleChainMsg(RuleChainAwareMsg msg) {
if (getApiUsageState().isReExecEnabled() && ruleChainsInitialized) {
if (getApiUsageState().isReExecEnabled()) {
getOrCreateActor(msg.getRuleChainId()).tell(msg);
}
}
@ -319,6 +320,12 @@ public class TenantActor extends RuleChainManagerActor {
return apiUsageState;
}
@Override
public ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
log.error("[{}] Failed to process msg: {}", tenantId, msg, t);
return doProcessFailure(t);
}
public static class ActorCreator extends ContextBasedCreator {
private final TenantId tenantId;

View File

@ -156,14 +156,29 @@ public class DefaultTbActorSystem implements TbActorSystem {
@Override
public void broadcastToChildren(TbActorId parent, TbActorMsg msg) {
broadcastToChildren(parent, id -> true, msg);
broadcastToChildren(parent, msg, false);
}
@Override
public void broadcastToChildren(TbActorId parent, TbActorMsg msg, boolean highPriority) {
broadcastToChildren(parent, id -> true, msg, highPriority);
}
@Override
public void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg) {
broadcastToChildren(parent, childFilter, msg, false);
}
private void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg, boolean highPriority) {
Set<TbActorId> children = parentChildMap.get(parent);
if (children != null) {
children.stream().filter(childFilter).forEach(id -> tell(id, msg));
children.stream().filter(childFilter).forEach(id -> {
try {
tell(id, msg, highPriority);
} catch (TbActorNotRegisteredException e) {
log.warn("Actor is missing for {}", id);
}
});
}
}
@ -190,6 +205,8 @@ public class DefaultTbActorSystem implements TbActorSystem {
stop(child);
}
}
parentChildMap.values().forEach(parentChildren -> parentChildren.remove(actorId));
TbActorMailbox mailbox = actors.remove(actorId);
if (mailbox != null) {
mailbox.destroy(null);

View File

@ -34,7 +34,7 @@ public interface TbActor {
return InitFailureStrategy.retryWithDelay(5000L * attempt);
}
default ProcessFailureStrategy onProcessFailure(Throwable t) {
default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
if (t instanceof Error) {
return ProcessFailureStrategy.stop();
} else {

View File

@ -36,6 +36,8 @@ public interface TbActorCtx extends TbActorRef {
void broadcastToChildren(TbActorMsg msg);
void broadcastToChildren(TbActorMsg msg, boolean highPriority);
void broadcastToChildrenByType(TbActorMsg msg, EntityType entityType);
void broadcastToChildren(TbActorMsg msg, Predicate<TbActorId> childFilter);

View File

@ -160,7 +160,7 @@ public final class TbActorMailbox implements TbActorCtx {
destroy(updateException.getCause());
} catch (Throwable t) {
log.debug("[{}] Failed to process message: {}", selfId, msg, t);
ProcessFailureStrategy strategy = actor.onProcessFailure(t);
ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t);
if (strategy.isStop()) {
system.stop(selfId);
}
@ -190,7 +190,12 @@ public final class TbActorMailbox implements TbActorCtx {
@Override
public void broadcastToChildren(TbActorMsg msg) {
system.broadcastToChildren(selfId, msg);
broadcastToChildren(msg, false);
}
@Override
public void broadcastToChildren(TbActorMsg msg, boolean highPriority) {
system.broadcastToChildren(selfId, msg, highPriority);
}
@Override

View File

@ -48,6 +48,8 @@ public interface TbActorSystem {
void broadcastToChildren(TbActorId parent, TbActorMsg msg);
void broadcastToChildren(TbActorId parent, TbActorMsg msg, boolean highPriority);
void broadcastToChildren(TbActorId parent, Predicate<TbActorId> childFilter, TbActorMsg msg);
List<TbActorId> filterChildren(TbActorId parent, Predicate<TbActorId> childFilter);