diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index fb6fbbdff2..1461654216 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -73,10 +73,14 @@ public class AppActor extends ContextAwareActor { @Override protected boolean doProcess(TbActorMsg msg) { if (!ruleChainsInitialized) { - initTenantActors(); - ruleChainsInitialized = true; - if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) { - log.warn("Rule Chains initialized by unexpected message: {}", msg); + if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) { + initTenantActors(); + ruleChainsInitialized = true; + } else { + if (!msg.getMsgType().isIgnoreOnStart()) { + log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg); + } + return true; } } switch (msg.getMsgType()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index f8f6a7d25f..51f4d5283f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -259,7 +259,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } void launchConsumer(TbQueueConsumer> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { - consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + if (isReady) { + consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + } else { + scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix); + } + } + + private void scheduleLaunchConsumer(TbQueueConsumer> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { + repartitionExecutor.schedule(() -> { + if (isReady) { + consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + } else { + scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix); + } + }, 10, TimeUnit.SECONDS); } void consumerLoop(TbQueueConsumer> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 2d517a2213..b59086a350 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -68,7 +68,7 @@ public abstract class AbstractConsumerService !ctx.isLocalEntity(entry.getKey())); + initAlarmRuleState(true); } @Override @@ -156,13 +161,16 @@ public class TbDeviceProfileNode implements TbNode { deviceStates.clear(); } - protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns) { + protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns, boolean printNewlyAddedDeviceStates) { DeviceState deviceState = deviceStates.get(deviceId); if (deviceState == null) { DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); if (deviceProfile != null) { deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns); deviceStates.put(deviceId, deviceState); + if (printNewlyAddedDeviceStates) { + log.info("[{}][{}] Device [{}] was added during PartitionChangeMsg", ctx.getTenantId(), ctx.getSelfId(), deviceId); + } } } return deviceState;