Minor refactoring for Rule Engine consumers

This commit is contained in:
ViacheslavKlimov 2025-03-26 11:32:26 +02:00
parent 8c2b4516ac
commit 66fd0fc4e9
4 changed files with 45 additions and 43 deletions

View File

@ -147,9 +147,12 @@ public class DefaultEdqsService implements EdqsService {
syncLock.lock(); syncLock.lock();
try { try {
EdqsSyncState syncState = getSyncState(); EdqsSyncState syncState = getSyncState();
if (syncState != null && syncState.getStatus() == EdqsSyncStatus.FINISHED) { if (syncState != null) {
log.info("EDQS sync is already finished"); EdqsSyncStatus status = syncState.getStatus();
return; if (status == EdqsSyncStatus.FINISHED || status == EdqsSyncStatus.FAILED) {
log.info("EDQS sync is already " + status + ", ignoring the msg");
return;
}
} }
saveSyncState(EdqsSyncStatus.STARTED); saveSyncState(EdqsSyncStatus.STARTED);

View File

@ -54,7 +54,7 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService; import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
@ -71,7 +71,7 @@ import java.util.stream.Collectors;
@Service @Service
@TbRuleEngineComponent @TbRuleEngineComponent
@Slf4j @Slf4j
public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPartitionedService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService { public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {
@Value("${queue.calculated_fields.poll_interval:25}") @Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval; private long pollInterval;
@ -99,7 +99,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
} }
@Override @Override
protected void doAfterStartUp() { protected void onStartUp() {
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create() PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
.queueKey(queueKey) .queueKey(queueKey)
@ -126,7 +126,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
} }
@Override @Override
protected void processPartitionChangeEvent(PartitionChangeEvent event) { protected void onPartitionChangeEvent(PartitionChangeEvent event) {
try { try {
event.getNewPartitions().forEach((queueKey, partitions) -> { event.getNewPartitions().forEach((queueKey, partitions) -> {
if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) { if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) {
@ -143,11 +143,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
} }
} }
@Override
protected String getPrefix() {
return "tb-cf";
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception { private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception {
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect( ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(
@ -195,6 +190,11 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPar
return ServiceType.TB_RULE_ENGINE; return ServiceType.TB_RULE_ENGINE;
} }
@Override
protected String getPrefix() {
return "tb-cf";
}
@Override @Override
protected long getNotificationPollDuration() { protected long getNotificationPollDuration() {
return pollInterval; return pollInterval;

View File

@ -49,7 +49,7 @@ import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService; import org.thingsboard.server.service.queue.processing.AbstractPartitionBasedConsumerService;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext; import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager; import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
@ -66,7 +66,7 @@ import java.util.stream.Collectors;
@Service @Service
@TbRuleEngineComponent @TbRuleEngineComponent
@Slf4j @Slf4j
public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitionedService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService { public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
private final TbRuleEngineConsumerContext ctx; private final TbRuleEngineConsumerContext ctx;
private final QueueService queueService; private final QueueService queueService;
@ -93,7 +93,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
} }
@Override @Override
protected void doAfterStartUp() { protected void onStartUp() {
List<Queue> queues = queueService.findAllQueues(); List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) { for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
@ -104,7 +104,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
} }
@Override @Override
protected void processPartitionChangeEvent(PartitionChangeEvent event) { protected void onPartitionChangeEvent(PartitionChangeEvent event) {
event.getNewPartitions().forEach((queueKey, partitions) -> { event.getNewPartitions().forEach((queueKey, partitions) -> {
if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) { if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) {
return; return;
@ -136,11 +136,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
}); });
} }
@Override
protected String getPrefix() {
return "tb-rule-engine";
}
@Override @Override
protected void stopConsumers() { protected void stopConsumers() {
super.stopConsumers(); super.stopConsumers();
@ -153,6 +148,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitio
return ServiceType.TB_RULE_ENGINE; return ServiceType.TB_RULE_ENGINE;
} }
@Override
protected String getPrefix() {
return "tb-rule-engine";
}
@Override @Override
protected long getNotificationPollDuration() { protected long getNotificationPollDuration() {
return ctx.getPollDuration(); return ctx.getPollDuration();

View File

@ -31,24 +31,22 @@ import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsServ
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractConsumerPartitionedService<N extends com.google.protobuf.GeneratedMessageV3> extends AbstractConsumerService<N> { public abstract class AbstractPartitionBasedConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends AbstractConsumerService<N> {
private final Lock startupLock; private final Lock startupLock = new ReentrantLock();
private volatile boolean consumersInitialized; private volatile boolean started = false;
private PartitionChangeEvent lastPartitionChangeEvent; private PartitionChangeEvent lastPartitionChangeEvent;
public AbstractConsumerPartitionedService(ActorSystemContext actorContext, public AbstractPartitionBasedConsumerService(ActorSystemContext actorContext,
TbTenantProfileCache tenantProfileCache, TbTenantProfileCache tenantProfileCache,
TbDeviceProfileCache deviceProfileCache, TbDeviceProfileCache deviceProfileCache,
TbAssetProfileCache assetProfileCache, TbAssetProfileCache assetProfileCache,
CalculatedFieldCache calculatedFieldCache, CalculatedFieldCache calculatedFieldCache,
TbApiUsageStateService apiUsageStateService, TbApiUsageStateService apiUsageStateService,
PartitionService partitionService, PartitionService partitionService,
ApplicationEventPublisher eventPublisher, ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService) { JwtSettingsService jwtSettingsService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService); super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
this.startupLock = new ReentrantLock();
this.consumersInitialized = false;
} }
@PostConstruct @PostConstruct
@ -57,13 +55,14 @@ public abstract class AbstractConsumerPartitionedService<N extends com.google.pr
} }
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE) @AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
@Override
public void afterStartUp() { public void afterStartUp() {
super.afterStartUp(); super.afterStartUp();
doAfterStartUp(); onStartUp();
startupLock.lock(); startupLock.lock();
try { try {
processPartitionChangeEvent(lastPartitionChangeEvent); onPartitionChangeEvent(lastPartitionChangeEvent);
consumersInitialized = true; started = true;
} finally { } finally {
startupLock.unlock(); startupLock.unlock();
} }
@ -71,10 +70,10 @@ public abstract class AbstractConsumerPartitionedService<N extends com.google.pr
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (!consumersInitialized) { if (!started) {
startupLock.lock(); startupLock.lock();
try { try {
if (!consumersInitialized) { if (!started) {
lastPartitionChangeEvent = event; lastPartitionChangeEvent = event;
return; return;
} }
@ -82,12 +81,12 @@ public abstract class AbstractConsumerPartitionedService<N extends com.google.pr
startupLock.unlock(); startupLock.unlock();
} }
} }
processPartitionChangeEvent(event); onPartitionChangeEvent(event);
} }
protected abstract void doAfterStartUp(); protected abstract void onStartUp();
protected abstract void processPartitionChangeEvent(PartitionChangeEvent event); protected abstract void onPartitionChangeEvent(PartitionChangeEvent event);
protected abstract String getPrefix(); protected abstract String getPrefix();