Use QueueConsumerManager for other consumers

This commit is contained in:
ViacheslavKlimov 2024-03-20 13:25:30 +02:00
parent 2174a0f3d7
commit 47cfebe2a7
8 changed files with 96 additions and 125 deletions

View File

@ -90,8 +90,8 @@ import org.thingsboard.server.service.notification.NotificationSchedulerService;
import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.consumer.BasicQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.service.queue.consumer.QueueConsumerManager; import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.resource.TbImageService; import org.thingsboard.server.service.resource.TbImageService;
@ -151,9 +151,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbImageService imageService; private final TbImageService imageService;
private final TbCoreConsumerStats stats; private final TbCoreConsumerStats stats;
private QueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer; private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
private BasicQueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer; private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private BasicQueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer; private QueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
private volatile ListeningExecutorService deviceActivityEventsExecutor; private volatile ListeningExecutorService deviceActivityEventsExecutor;
@ -199,7 +199,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
super.init("tb-core"); super.init("tb-core");
this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor"))); this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
this.mainConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig>builder() this.mainConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig>builder()
.queueKey(new QueueKey(ServiceType.TB_CORE)) .queueKey(new QueueKey(ServiceType.TB_CORE))
.config(CoreQueueConfig.of(consumerPerPartition, (int) pollInterval)) .config(CoreQueueConfig.of(consumerPerPartition, (int) pollInterval))
.msgPackProcessor(this::processMsgs) .msgPackProcessor(this::processMsgs)
@ -208,19 +208,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
.scheduler(scheduler) .scheduler(scheduler)
.taskExecutor(mgmtExecutor) .taskExecutor(mgmtExecutor)
.build(); .build();
this.usageStatsConsumer = BasicQueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder() this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder()
.key("usage-stats") .key("usage-stats")
.name("TB Usage Stats") .name("TB Usage Stats")
.pollInterval(pollInterval)
.msgPackProcessor(this::processUsageStatsMsg) .msgPackProcessor(this::processUsageStatsMsg)
.pollInterval(pollInterval)
.consumerCreator(queueFactory::createToUsageStatsServiceMsgConsumer) .consumerCreator(queueFactory::createToUsageStatsServiceMsgConsumer)
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.build(); .build();
this.firmwareStatesConsumer = BasicQueueConsumerManager.<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>>builder() this.firmwareStatesConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>>builder()
.key("firmware") .key("firmware")
.name("TB Ota Package States") .name("TB Ota Package States")
.pollInterval(pollInterval)
.msgPackProcessor(this::processFirmwareMsgs) .msgPackProcessor(this::processFirmwareMsgs)
.pollInterval(pollInterval)
.consumerCreator(queueFactory::createToOtaPackageStateServiceMsgConsumer) .consumerCreator(queueFactory::createToOtaPackageStateServiceMsgConsumer)
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.build(); .build();

View File

@ -45,7 +45,7 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
public class QueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> { public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
protected final QueueKey queueKey; protected final QueueKey queueKey;
@Getter @Getter
@ -65,8 +65,7 @@ public class QueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
protected volatile boolean stopped; protected volatile boolean stopped;
@Builder @Builder
public QueueConsumerManager(QueueKey queueKey, public MainQueueConsumerManager(QueueKey queueKey, C config,
C config,
MsgPackProcessor<M, C> msgPackProcessor, MsgPackProcessor<M, C> msgPackProcessor,
Function<C, TbQueueConsumer<M>> consumerCreator, Function<C, TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor, ExecutorService consumerExecutor,

View File

@ -46,7 +46,7 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.TbPackCallback; import org.thingsboard.server.service.queue.TbPackCallback;
import org.thingsboard.server.service.queue.TbPackProcessingContext; import org.thingsboard.server.service.queue.TbPackProcessingContext;
import org.thingsboard.server.service.queue.consumer.BasicQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.List; import java.util.List;
@ -73,7 +73,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected final ApplicationEventPublisher eventPublisher; protected final ApplicationEventPublisher eventPublisher;
protected final JwtSettingsService jwtSettingsService; protected final JwtSettingsService jwtSettingsService;
protected BasicQueueConsumerManager<TbProtoQueueMsg<N>> nfConsumer; protected QueueConsumerManager<TbProtoQueueMsg<N>> nfConsumer;
protected ExecutorService consumersExecutor; protected ExecutorService consumersExecutor;
protected ExecutorService mgmtExecutor; protected ExecutorService mgmtExecutor;
@ -84,11 +84,11 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt"); this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt");
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(prefix + "-consumer-scheduler")); this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(prefix + "-consumer-scheduler"));
this.nfConsumer = BasicQueueConsumerManager.<TbProtoQueueMsg<N>>builder() this.nfConsumer = QueueConsumerManager.<TbProtoQueueMsg<N>>builder()
.key("notifications") .key("notifications")
.name("TB Notifications") .name("TB Notifications")
.pollInterval(getNotificationPollDuration())
.msgPackProcessor(this::processNotifications) .msgPackProcessor(this::processNotifications)
.pollInterval(getNotificationPollDuration())
.consumerCreator(this::createNotificationsConsumer) .consumerCreator(this::createNotificationsConsumer)
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.build(); .build();

View File

@ -37,7 +37,7 @@ import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.TbMsgPackCallback; import org.thingsboard.server.service.queue.TbMsgPackCallback;
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext; import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.service.queue.consumer.QueueConsumerManager; import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy; import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
@ -55,7 +55,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
public class TbRuleEngineQueueConsumerManager extends QueueConsumerManager<TbProtoQueueMsg<ToRuleEngineMsg>, Queue> { public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<TbProtoQueueMsg<ToRuleEngineMsg>, Queue> {
public static final String SUCCESSFUL_STATUS = "successful"; public static final String SUCCESSFUL_STATUS = "successful";
public static final String FAILED_STATUS = "failed"; public static final String FAILED_STATUS = "failed";

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.queue.consumer; package org.thingsboard.server.queue.common.consumer;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
@ -29,12 +29,12 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Supplier; import java.util.function.Supplier;
@Slf4j @Slf4j
public class BasicQueueConsumerManager<M extends TbQueueMsg> { public class QueueConsumerManager<M extends TbQueueMsg> {
private final String key; private final String key;
private final String name; private final String name;
private final long pollInterval;
private final MsgPackProcessor<M> msgPackProcessor; private final MsgPackProcessor<M> msgPackProcessor;
private final long pollInterval;
private final ExecutorService consumerExecutor; private final ExecutorService consumerExecutor;
@Getter @Getter
@ -42,10 +42,8 @@ public class BasicQueueConsumerManager<M extends TbQueueMsg> {
private volatile boolean stopped; private volatile boolean stopped;
@Builder @Builder
public BasicQueueConsumerManager(String key, String name, public QueueConsumerManager(String key, String name, MsgPackProcessor<M> msgPackProcessor,
long pollInterval, long pollInterval, Supplier<TbQueueConsumer<M>> consumerCreator,
MsgPackProcessor<M> msgPackProcessor,
Supplier<TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor) { ExecutorService consumerExecutor) {
this.key = key; this.key = key;
this.name = name; this.name = name;

View File

@ -20,6 +20,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -96,6 +98,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.TopicService;
@ -105,14 +108,12 @@ import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbTransportComponent; import org.thingsboard.server.queue.util.TbTransportComponent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -179,19 +180,17 @@ public class DefaultTransportService extends TransportActivityManager implements
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate; protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer; protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer; protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer;
protected TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer; protected QueueConsumerManager<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer;
protected MessagesStats ruleEngineProducerStats; protected MessagesStats ruleEngineProducerStats;
protected MessagesStats tbCoreProducerStats; protected MessagesStats tbCoreProducerStats;
protected MessagesStats transportApiStats; protected MessagesStats transportApiStats;
protected ExecutorService transportCallbackExecutor; protected ExecutorService transportCallbackExecutor;
private ExecutorService mainConsumerExecutor; private ExecutorService consumerExecutor;
private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>(); private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>();
private volatile boolean stopped = false;
public DefaultTransportService(PartitionService partitionService, public DefaultTransportService(PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider, TbServiceInfoProvider serviceInfoProvider,
TbTransportQueueFactory queueProvider, TbTransportQueueFactory queueProvider,
@ -232,42 +231,34 @@ public class DefaultTransportService extends TransportActivityManager implements
transportApiRequestTemplate.setMessagesStats(transportApiStats); transportApiRequestTemplate.setMessagesStats(transportApiStats);
ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer(); ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
transportApiRequestTemplate.init(); transportApiRequestTemplate.init();
mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("consumer"));
transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder()
.key("transport")
.name("TB Transport")
.msgPackProcessor(this::processNotificationMsgs)
.pollInterval(notificationsPollDuration)
.consumerCreator(queueProvider::createTransportNotificationsConsumer)
.consumerExecutor(consumerExecutor)
.build();
} }
@AfterStartUp(order = AfterStartUp.TRANSPORT_SERVICE) @AfterStartUp(order = AfterStartUp.TRANSPORT_SERVICE)
public void start() { public void start() {
mainConsumerExecutor.execute(() -> { TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
while (!stopped) { transportNotificationsConsumer.subscribe(Set.of(tpi));
try { transportNotificationsConsumer.launch();
List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
if (records.size() == 0) {
continue;
} }
records.forEach(record -> {
private void processNotificationMsgs(List<TbProtoQueueMsg<ToTransportMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> consumer) {
msgs.forEach(msg -> {
try { try {
processToTransportMsg(record.getValue()); processToTransportMsg(msg.getValue());
} catch (Throwable e) { } catch (Throwable e) {
log.warn("Failed to process the notification.", e); log.warn("Failed to process the notification.", e);
} }
}); });
transportNotificationsConsumer.commit(); consumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(notificationsPollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
});
} }
private void invalidateRateLimits() { private void invalidateRateLimits() {
@ -276,16 +267,14 @@ public class DefaultTransportService extends TransportActivityManager implements
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
stopped = true;
if (transportNotificationsConsumer != null) { if (transportNotificationsConsumer != null) {
transportNotificationsConsumer.unsubscribe(); transportNotificationsConsumer.stop();
} }
if (transportCallbackExecutor != null) { if (transportCallbackExecutor != null) {
transportCallbackExecutor.shutdownNow(); transportCallbackExecutor.shutdownNow();
} }
if (mainConsumerExecutor != null) { if (consumerExecutor != null) {
mainConsumerExecutor.shutdownNow(); consumerExecutor.shutdownNow();
} }
if (transportApiRequestTemplate != null) { if (transportApiRequestTemplate != null) {
transportApiRequestTemplate.stop(); transportApiRequestTemplate.stop();

View File

@ -53,7 +53,7 @@ public class ThingsBoardThreadFactory implements ThreadFactory {
public static void addThreadNamePrefix(String prefix) { public static void addThreadNamePrefix(String prefix) {
String name = Thread.currentThread().getName(); String name = Thread.currentThread().getName();
name = prefix + name; name = prefix + "-" + name;
Thread.currentThread().setName(name); Thread.currentThread().setName(name);
} }

View File

@ -21,14 +21,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
import org.eclipse.jgit.errors.LargeObjectException; import org.eclipse.jgit.errors.LargeObjectException;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
@ -68,16 +67,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.VersionedEntityInfoP
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory; import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbVersionControlComponent; import org.thingsboard.server.queue.util.TbVersionControlComponent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -115,9 +114,8 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap<>(); private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap<>();
private volatile ExecutorService consumerExecutor; private volatile ExecutorService consumerExecutor;
private volatile TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer; private volatile QueueConsumerManager<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer;
private volatile TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> producer; private volatile TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> producer;
private volatile boolean stopped = false;
@Value("${queue.vc.poll-interval:25}") @Value("${queue.vc.poll-interval:25}")
private long pollDuration; private long pollDuration;
@ -134,20 +132,26 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
@PostConstruct @PostConstruct
public void init() { public void init() {
consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("vc-consumer")); consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("consumer"));
var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread"); var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread");
for (int i = 0; i < ioPoolSize; i++) { for (int i = 0; i < ioPoolSize; i++) {
ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory))); ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory)));
} }
producer = producerProvider.getTbCoreNotificationsMsgProducer(); producer = producerProvider.getTbCoreNotificationsMsgProducer();
consumer = queueFactory.createToVersionControlMsgConsumer(); consumer = QueueConsumerManager.<TbProtoQueueMsg<ToVersionControlServiceMsg>>builder()
.key("vc")
.name("TB Version Control")
.msgPackProcessor(this::processMsgs)
.pollInterval(pollDuration)
.consumerCreator(queueFactory::createToVersionControlMsgConsumer)
.consumerExecutor(consumerExecutor)
.build();
} }
@PreDestroy @PreDestroy
public void stop() { public void stop() {
stopped = true;
if (consumer != null) { if (consumer != null) {
consumer.unsubscribe(); consumer.stop();
} }
if (consumerExecutor != null) { if (consumerExecutor != null) {
consumerExecutor.shutdownNow(); consumerExecutor.shutdownNow();
@ -179,20 +183,13 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
return ServiceType.TB_VC_EXECUTOR.equals(event.getServiceType()); return ServiceType.TB_VC_EXECUTOR.equals(event.getServiceType());
} }
@EventListener(ApplicationReadyEvent.class) @AfterStartUp(order = 2)
@Order(value = 2) public void afterStartUp() {
public void onApplicationEvent(ApplicationReadyEvent event) { consumer.launch();
consumerExecutor.execute(() -> consumerLoop(consumer));
} }
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer) { void processMsgs(List<TbProtoQueueMsg<ToVersionControlServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer) throws Exception {
while (!stopped && !consumer.isStopped()) {
List<ListenableFuture<?>> futures = new ArrayList<>(); List<ListenableFuture<?>> futures = new ArrayList<>();
try {
List<TbProtoQueueMsg<ToVersionControlServiceMsg>> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
for (TbProtoQueueMsg<ToVersionControlServiceMsg> msgWrapper : msgs) { for (TbProtoQueueMsg<ToVersionControlServiceMsg> msgWrapper : msgs) {
ToVersionControlServiceMsg msg = msgWrapper.getValue(); ToVersionControlServiceMsg msg = msgWrapper.getValue();
var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : ProtoUtils.fromProto(msg.getVcSettings())); var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : ProtoUtils.fromProto(msg.getVcSettings()));
@ -209,18 +206,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
log.info("Timeout for processing the version control tasks.", e); log.info("Timeout for processing the version control tasks.", e);
} }
consumer.commit(); consumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain version control requests from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new version control messages", e2);
}
}
}
}
log.info("TB Version Control request consumer stopped.");
} }
private Void processMessage(VersionControlRequestCtx ctx, ToVersionControlServiceMsg msg) { private Void processMessage(VersionControlRequestCtx ctx, ToVersionControlServiceMsg msg) {
@ -273,7 +258,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path) var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path)
.stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList()); .stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList());
if (!ids.isEmpty()) { if (!ids.isEmpty()) {
for (int i = 0; i < ids.size(); i++){ for (int i = 0; i < ids.size(); i++) {
VersionedEntityInfo info = ids.get(i); VersionedEntityInfo info = ids.get(i);
var data = vcService.getFileContentAtCommit(ctx.getTenantId(), var data = vcService.getFileContentAtCommit(ctx.getTenantId(),
getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId()); getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId());