HotFix - fixed init of rule chains - init only on APP_INIT msg

This commit is contained in:
Volodymyr Babak 2023-06-22 16:37:03 +03:00
parent b14459dd9b
commit 371cab26d2
5 changed files with 49 additions and 11 deletions

View File

@ -73,10 +73,14 @@ public class AppActor extends ContextAwareActor {
@Override @Override
protected boolean doProcess(TbActorMsg msg) { protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) { if (!ruleChainsInitialized) {
initTenantActors(); if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
ruleChainsInitialized = true; initTenantActors();
if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) { ruleChainsInitialized = true;
log.warn("Rule Chains initialized by unexpected message: {}", msg); } else {
if (!msg.getMsgType().isIgnoreOnStart()) {
log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
}
return true;
} }
} }
switch (msg.getMsgType()) { switch (msg.getMsgType()) {

View File

@ -259,7 +259,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); if (isReady) {
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
} else {
scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
}
}
private void scheduleLaunchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
repartitionExecutor.schedule(() -> {
if (isReady) {
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
} else {
scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
}
}, 10, TimeUnit.SECONDS);
} }
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {

View File

@ -68,7 +68,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected volatile ExecutorService consumersExecutor; protected volatile ExecutorService consumersExecutor;
protected volatile ExecutorService notificationsConsumerExecutor; protected volatile ExecutorService notificationsConsumerExecutor;
protected volatile boolean stopped = false; protected volatile boolean stopped = false;
protected volatile boolean isReady = false;
protected final ActorSystemContext actorContext; protected final ActorSystemContext actorContext;
protected final DataDecodingEncodingService encodingService; protected final DataDecodingEncodingService encodingService;
protected final TbTenantProfileCache tenantProfileCache; protected final TbTenantProfileCache tenantProfileCache;
@ -108,6 +108,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
public void onApplicationEvent(ApplicationReadyEvent event) { public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Subscribing to notifications: {}", nfConsumer.getTopic()); log.info("Subscribing to notifications: {}", nfConsumer.getTopic());
this.nfConsumer.subscribe(); this.nfConsumer.subscribe();
this.isReady = true;
launchNotificationsConsumer(); launchNotificationsConsumer();
launchMainConsumers(); launchMainConsumers();
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.common.msg; package org.thingsboard.server.common.msg;
import lombok.Getter;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@ -28,7 +29,7 @@ public enum MsgType {
* *
* See {@link PartitionChangeMsg} * See {@link PartitionChangeMsg}
*/ */
PARTITION_CHANGE_MSG, PARTITION_CHANGE_MSG(true),
APP_INIT_MSG, APP_INIT_MSG,
@ -108,7 +109,7 @@ public enum MsgType {
* Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
*/ */
SESSION_TIMEOUT_MSG, SESSION_TIMEOUT_MSG(true),
STATS_PERSIST_TICK_MSG, STATS_PERSIST_TICK_MSG,
@ -130,4 +131,14 @@ public enum MsgType {
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG, EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG; EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
@Getter
private final boolean ignoreOnStart;
MsgType() {
this.ignoreOnStart = false;
}
MsgType(boolean ignoreOnStart) {
this.ignoreOnStart = ignoreOnStart;
}
} }

View File

@ -76,6 +76,10 @@ public class TbDeviceProfileNode implements TbNode {
this.ctx = ctx; this.ctx = ctx;
scheduleAlarmHarvesting(ctx, null); scheduleAlarmHarvesting(ctx, null);
ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate); ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate);
initAlarmRuleState(false);
}
private void initAlarmRuleState(boolean printNewlyAddedDeviceStates) {
if (config.isFetchAlarmRulesStateOnStart()) { if (config.isFetchAlarmRulesStateOnStart()) {
log.info("[{}] Fetching alarm rule state", ctx.getSelfId()); log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
int fetchCount = 0; int fetchCount = 0;
@ -86,7 +90,7 @@ public class TbDeviceProfileNode implements TbNode {
for (RuleNodeState rns : states.getData()) { for (RuleNodeState rns : states.getData()) {
fetchCount++; fetchCount++;
if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) { if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) {
getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns); getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns, printNewlyAddedDeviceStates);
} }
} }
} }
@ -130,7 +134,7 @@ public class TbDeviceProfileNode implements TbNode {
removeDeviceState(deviceId); removeDeviceState(deviceId);
ctx.tellSuccess(msg); ctx.tellSuccess(msg);
} else { } else {
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null); DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null, false);
if (deviceState != null) { if (deviceState != null) {
deviceState.process(ctx, msg); deviceState.process(ctx, msg);
} else { } else {
@ -148,6 +152,7 @@ public class TbDeviceProfileNode implements TbNode {
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) { public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
// Cleanup the cache for all entities that are no longer assigned to current server partitions // Cleanup the cache for all entities that are no longer assigned to current server partitions
deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey())); deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
initAlarmRuleState(true);
} }
@Override @Override
@ -156,13 +161,16 @@ public class TbDeviceProfileNode implements TbNode {
deviceStates.clear(); deviceStates.clear();
} }
protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns) { protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns, boolean printNewlyAddedDeviceStates) {
DeviceState deviceState = deviceStates.get(deviceId); DeviceState deviceState = deviceStates.get(deviceId);
if (deviceState == null) { if (deviceState == null) {
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
if (deviceProfile != null) { if (deviceProfile != null) {
deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns); deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns);
deviceStates.put(deviceId, deviceState); deviceStates.put(deviceId, deviceState);
if (printNewlyAddedDeviceStates) {
log.info("[{}][{}] Device [{}] was added during PartitionChangeMsg", ctx.getTenantId(), ctx.getSelfId(), deviceId);
}
} }
} }
return deviceState; return deviceState;