Merge pull request #13008 from YevhenBondarenko/feature/startap-improvements

Refactored initialization chain, PartitionChangeEvent should be processed after actors startup
This commit is contained in:
Viacheslav Klimov 2025-03-26 11:15:22 +02:00 committed by GitHub
commit cf78af9790
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 114 additions and 15 deletions

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.service.queue; package org.thingsboard.server.service.queue;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
@ -55,7 +54,7 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService;
import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
@ -72,7 +71,7 @@ import java.util.stream.Collectors;
@Service @Service
@TbRuleEngineComponent @TbRuleEngineComponent
@Slf4j @Slf4j
public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService { public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerPartitionedService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {
@Value("${queue.calculated_fields.poll_interval:25}") @Value("${queue.calculated_fields.poll_interval:25}")
private long pollInterval; private long pollInterval;
@ -99,10 +98,8 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
this.stateService = stateService; this.stateService = stateService;
} }
@PostConstruct @Override
public void init() { protected void doAfterStartUp() {
super.init("tb-cf");
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create() PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
.queueKey(queueKey) .queueKey(queueKey)
@ -129,7 +126,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
} }
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void processPartitionChangeEvent(PartitionChangeEvent event) {
try { try {
event.getNewPartitions().forEach((queueKey, partitions) -> { event.getNewPartitions().forEach((queueKey, partitions) -> {
if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) { if (queueKey.getQueueName().equals(DataConstants.CF_QUEUE_NAME)) {
@ -146,6 +143,11 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
} }
} }
@Override
protected String getPrefix() {
return "tb-cf";
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception { private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception {
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect( ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.service.queue; package org.thingsboard.server.service.queue;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
@ -50,7 +49,7 @@ import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerPartitionedService;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext; import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineConsumerContext;
import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager; import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumerManager;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
@ -67,7 +66,7 @@ import java.util.stream.Collectors;
@Service @Service
@TbRuleEngineComponent @TbRuleEngineComponent
@Slf4j @Slf4j
public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService { public class DefaultTbRuleEngineConsumerService extends AbstractConsumerPartitionedService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
private final TbRuleEngineConsumerContext ctx; private final TbRuleEngineConsumerContext ctx;
private final QueueService queueService; private final QueueService queueService;
@ -93,9 +92,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
this.queueService = queueService; this.queueService = queueService;
} }
@PostConstruct @Override
public void init() { protected void doAfterStartUp() {
super.init("tb-rule-engine");
List<Queue> queues = queueService.findAllQueues(); List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) { for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
@ -106,7 +104,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void processPartitionChangeEvent(PartitionChangeEvent event) {
event.getNewPartitions().forEach((queueKey, partitions) -> { event.getNewPartitions().forEach((queueKey, partitions) -> {
if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) { if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) {
return; return;
@ -138,6 +136,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}); });
} }
@Override
protected String getPrefix() {
return "tb-rule-engine";
}
@Override @Override
protected void stopConsumers() { protected void stopConsumers() {
super.stopConsumers(); super.stopConsumers();

View File

@ -0,0 +1,94 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue.processing;
import jakarta.annotation.PostConstruct;
import org.springframework.context.ApplicationEventPublisher;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractConsumerPartitionedService<N extends com.google.protobuf.GeneratedMessageV3> extends AbstractConsumerService<N> {
private final Lock startupLock;
private volatile boolean consumersInitialized;
private PartitionChangeEvent lastPartitionChangeEvent;
public AbstractConsumerPartitionedService(ActorSystemContext actorContext,
TbTenantProfileCache tenantProfileCache,
TbDeviceProfileCache deviceProfileCache,
TbAssetProfileCache assetProfileCache,
CalculatedFieldCache calculatedFieldCache,
TbApiUsageStateService apiUsageStateService,
PartitionService partitionService,
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
this.startupLock = new ReentrantLock();
this.consumersInitialized = false;
}
@PostConstruct
public void init() {
super.init(getPrefix());
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void afterStartUp() {
super.afterStartUp();
doAfterStartUp();
startupLock.lock();
try {
processPartitionChangeEvent(lastPartitionChangeEvent);
consumersInitialized = true;
} finally {
startupLock.unlock();
}
}
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (!consumersInitialized) {
startupLock.lock();
try {
if (!consumersInitialized) {
lastPartitionChangeEvent = event;
return;
}
} finally {
startupLock.unlock();
}
}
processPartitionChangeEvent(event);
}
protected abstract void doAfterStartUp();
protected abstract void processPartitionChangeEvent(PartitionChangeEvent event);
protected abstract String getPrefix();
}