From 15d189ead730245e29dcf07d959e34cfb8693e48 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 7 May 2024 15:13:52 +0300 Subject: [PATCH] Refactoring for consumer manager --- .../HousekeeperReprocessingService.java | 2 +- .../housekeeper/HousekeeperService.java | 2 +- .../queue/DefaultTbCoreConsumerService.java | 4 +- .../queue/consumer/QueueConsumerManager.java | 109 ------------------ .../processing/AbstractConsumerService.java | 2 +- .../common/consumer/QueueConsumerManager.java | 13 ++- .../service/DefaultTransportService.java | 3 +- .../DefaultClusterVersionControlService.java | 3 +- 8 files changed, 15 insertions(+), 123 deletions(-) delete mode 100644 application/src/main/java/org/thingsboard/server/service/queue/consumer/QueueConsumerManager.java diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java index ae16f925a0..37a55c99d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.java @@ -27,11 +27,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.housekeeper.HousekeeperConfig; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.queue.consumer.QueueConsumerManager; import javax.annotation.PreDestroy; import java.util.LinkedHashSet; diff --git a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java index 06e38d0160..eea92eb75b 100644 --- a/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java +++ b/application/src/main/java/org/thingsboard/server/service/housekeeper/HousekeeperService.java @@ -27,13 +27,13 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.housekeeper.HousekeeperConfig; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProcessor; import org.thingsboard.server.service.housekeeper.stats.HousekeeperStatsService; -import org.thingsboard.server.service.queue.consumer.QueueConsumerManager; import javax.annotation.PreDestroy; import java.util.List; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index abfcd47a46..fbef0cfd29 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -210,20 +210,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService>builder() - .key("usage-stats") .name("TB Usage Stats") .msgPackProcessor(this::processUsageStatsMsg) .pollInterval(pollInterval) .consumerCreator(queueFactory::createToUsageStatsServiceMsgConsumer) .consumerExecutor(consumersExecutor) + .threadPrefix("usage-stats") .build(); this.firmwareStatesConsumer = QueueConsumerManager.>builder() - .key("firmware") .name("TB Ota Package States") .msgPackProcessor(this::processFirmwareMsgs) .pollInterval(pollInterval) .consumerCreator(queueFactory::createToOtaPackageStateServiceMsgConsumer) .consumerExecutor(consumersExecutor) + .threadPrefix("firmware") .build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/consumer/QueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/consumer/QueueConsumerManager.java deleted file mode 100644 index e7b3a2f2b7..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/queue/consumer/QueueConsumerManager.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.queue.consumer; - -import lombok.Builder; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.queue.TbQueueConsumer; -import org.thingsboard.server.queue.TbQueueMsg; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; - -@Slf4j -public class QueueConsumerManager { - - private final String name; - private final MsgPackProcessor msgPackProcessor; - private final long pollInterval; - private final ExecutorService consumerExecutor; - private final String threadPrefix; - - @Getter - private final TbQueueConsumer consumer; - private volatile boolean stopped; - - @Builder - public QueueConsumerManager(String name, MsgPackProcessor msgPackProcessor, - long pollInterval, Supplier> consumerCreator, - ExecutorService consumerExecutor, String threadPrefix) { - this.name = name; - this.pollInterval = pollInterval; - this.msgPackProcessor = msgPackProcessor; - this.consumerExecutor = consumerExecutor; - this.threadPrefix = threadPrefix; - this.consumer = consumerCreator.get(); - } - - public void subscribe() { - consumer.subscribe(); - } - - public void subscribe(Set partitions) { - consumer.subscribe(partitions); - } - - public void launch() { - log.info("[{}] Launching consumer", name); - consumerExecutor.submit(() -> { - if (threadPrefix != null) { - ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix); - } - try { - consumerLoop(consumer); - } catch (Throwable e) { - log.error("Failure in consumer loop", e); - } - log.info("[{}] Consumer stopped", name); - }); - } - - private void consumerLoop(TbQueueConsumer consumer) { - while (!stopped && !consumer.isStopped()) { - try { - List msgs = consumer.poll(pollInterval); - if (msgs.isEmpty()) { - continue; - } - msgPackProcessor.process(msgs, consumer); - } catch (Exception e) { - if (!consumer.isStopped()) { - log.warn("Failed to process messages from queue", e); - try { - Thread.sleep(pollInterval); - } catch (InterruptedException interruptedException) { - log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException); - } - } - } - } - } - - public void stop() { - log.debug("[{}] Stopping consumer", name); - stopped = true; - consumer.unsubscribe(); - } - - public interface MsgPackProcessor { - void process(List msgs, TbQueueConsumer consumer) throws Exception; - } -} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 2967143dd4..85227e50ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -85,12 +85,12 @@ public abstract class AbstractConsumerService>builder() - .key("notifications") .name("TB Notifications") .msgPackProcessor(this::processNotifications) .pollInterval(getNotificationPollDuration()) .consumerCreator(this::createNotificationsConsumer) .consumerExecutor(consumersExecutor) + .threadPrefix("notifications") .build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java index bf103bef60..90f4cf050c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueConsumerManager.java @@ -31,25 +31,25 @@ import java.util.function.Supplier; @Slf4j public class QueueConsumerManager { - private final String key; private final String name; private final MsgPackProcessor msgPackProcessor; private final long pollInterval; private final ExecutorService consumerExecutor; + private final String threadPrefix; @Getter private final TbQueueConsumer consumer; private volatile boolean stopped; @Builder - public QueueConsumerManager(String key, String name, MsgPackProcessor msgPackProcessor, + public QueueConsumerManager(String name, MsgPackProcessor msgPackProcessor, long pollInterval, Supplier> consumerCreator, - ExecutorService consumerExecutor) { - this.key = key; + ExecutorService consumerExecutor, String threadPrefix) { this.name = name; this.pollInterval = pollInterval; this.msgPackProcessor = msgPackProcessor; this.consumerExecutor = consumerExecutor; + this.threadPrefix = threadPrefix; this.consumer = consumerCreator.get(); } @@ -64,7 +64,9 @@ public class QueueConsumerManager { public void launch() { log.info("[{}] Launching consumer", name); consumerExecutor.submit(() -> { - ThingsBoardThreadFactory.addThreadNamePrefix(key); + if (threadPrefix != null) { + ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix); + } try { consumerLoop(consumer); } catch (Throwable e) { @@ -104,4 +106,5 @@ public class QueueConsumerManager { public interface MsgPackProcessor { void process(List msgs, TbQueueConsumer consumer) throws Exception; } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index d208fab29e..8b7b88c261 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -232,9 +232,8 @@ public class DefaultTransportService extends TransportActivityManager implements ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer(); tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); transportApiRequestTemplate.init(); - consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("consumer")); + consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); transportNotificationsConsumer = QueueConsumerManager.>builder() - .key("transport") .name("TB Transport") .msgPackProcessor(this::processNotificationMsgs) .pollInterval(notificationsPollDuration) diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index a5a7021d8d..34171c19cd 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -132,14 +132,13 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe @PostConstruct public void init() { - consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("consumer")); + consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("vc-consumer")); var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread"); for (int i = 0; i < ioPoolSize; i++) { ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory))); } producer = producerProvider.getTbCoreNotificationsMsgProducer(); consumer = QueueConsumerManager.>builder() - .key("vc") .name("TB Version Control") .msgPackProcessor(this::processMsgs) .pollInterval(pollDuration)