Refactoring for consumer manager

This commit is contained in:
ViacheslavKlimov 2024-05-07 15:13:52 +03:00
parent 9f6156a0fb
commit 15d189ead7
8 changed files with 15 additions and 123 deletions

View File

@ -27,11 +27,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig; import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.queue.consumer.QueueConsumerManager;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;

View File

@ -27,13 +27,13 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig; import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProcessor; import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProcessor;
import org.thingsboard.server.service.housekeeper.stats.HousekeeperStatsService; import org.thingsboard.server.service.housekeeper.stats.HousekeeperStatsService;
import org.thingsboard.server.service.queue.consumer.QueueConsumerManager;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.List; import java.util.List;

View File

@ -210,20 +210,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
.taskExecutor(mgmtExecutor) .taskExecutor(mgmtExecutor)
.build(); .build();
this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder() this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder()
.key("usage-stats")
.name("TB Usage Stats") .name("TB Usage Stats")
.msgPackProcessor(this::processUsageStatsMsg) .msgPackProcessor(this::processUsageStatsMsg)
.pollInterval(pollInterval) .pollInterval(pollInterval)
.consumerCreator(queueFactory::createToUsageStatsServiceMsgConsumer) .consumerCreator(queueFactory::createToUsageStatsServiceMsgConsumer)
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.threadPrefix("usage-stats")
.build(); .build();
this.firmwareStatesConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>>builder() this.firmwareStatesConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>>builder()
.key("firmware")
.name("TB Ota Package States") .name("TB Ota Package States")
.msgPackProcessor(this::processFirmwareMsgs) .msgPackProcessor(this::processFirmwareMsgs)
.pollInterval(pollInterval) .pollInterval(pollInterval)
.consumerCreator(queueFactory::createToOtaPackageStateServiceMsgConsumer) .consumerCreator(queueFactory::createToOtaPackageStateServiceMsgConsumer)
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.threadPrefix("firmware")
.build(); .build();
} }

View File

@ -1,109 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue.consumer;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@Slf4j
public class QueueConsumerManager<M extends TbQueueMsg> {
private final String name;
private final MsgPackProcessor<M> msgPackProcessor;
private final long pollInterval;
private final ExecutorService consumerExecutor;
private final String threadPrefix;
@Getter
private final TbQueueConsumer<M> consumer;
private volatile boolean stopped;
@Builder
public QueueConsumerManager(String name, MsgPackProcessor<M> msgPackProcessor,
long pollInterval, Supplier<TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor, String threadPrefix) {
this.name = name;
this.pollInterval = pollInterval;
this.msgPackProcessor = msgPackProcessor;
this.consumerExecutor = consumerExecutor;
this.threadPrefix = threadPrefix;
this.consumer = consumerCreator.get();
}
public void subscribe() {
consumer.subscribe();
}
public void subscribe(Set<TopicPartitionInfo> partitions) {
consumer.subscribe(partitions);
}
public void launch() {
log.info("[{}] Launching consumer", name);
consumerExecutor.submit(() -> {
if (threadPrefix != null) {
ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix);
}
try {
consumerLoop(consumer);
} catch (Throwable e) {
log.error("Failure in consumer loop", e);
}
log.info("[{}] Consumer stopped", name);
});
}
private void consumerLoop(TbQueueConsumer<M> consumer) {
while (!stopped && !consumer.isStopped()) {
try {
List<M> msgs = consumer.poll(pollInterval);
if (msgs.isEmpty()) {
continue;
}
msgPackProcessor.process(msgs, consumer);
} catch (Exception e) {
if (!consumer.isStopped()) {
log.warn("Failed to process messages from queue", e);
try {
Thread.sleep(pollInterval);
} catch (InterruptedException interruptedException) {
log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException);
}
}
}
}
}
public void stop() {
log.debug("[{}] Stopping consumer", name);
stopped = true;
consumer.unsubscribe();
}
public interface MsgPackProcessor<M extends TbQueueMsg> {
void process(List<M> msgs, TbQueueConsumer<M> consumer) throws Exception;
}
}

View File

@ -85,12 +85,12 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(prefix + "-consumer-scheduler")); this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(prefix + "-consumer-scheduler"));
this.nfConsumer = QueueConsumerManager.<TbProtoQueueMsg<N>>builder() this.nfConsumer = QueueConsumerManager.<TbProtoQueueMsg<N>>builder()
.key("notifications")
.name("TB Notifications") .name("TB Notifications")
.msgPackProcessor(this::processNotifications) .msgPackProcessor(this::processNotifications)
.pollInterval(getNotificationPollDuration()) .pollInterval(getNotificationPollDuration())
.consumerCreator(this::createNotificationsConsumer) .consumerCreator(this::createNotificationsConsumer)
.consumerExecutor(consumersExecutor) .consumerExecutor(consumersExecutor)
.threadPrefix("notifications")
.build(); .build();
} }

View File

@ -31,25 +31,25 @@ import java.util.function.Supplier;
@Slf4j @Slf4j
public class QueueConsumerManager<M extends TbQueueMsg> { public class QueueConsumerManager<M extends TbQueueMsg> {
private final String key;
private final String name; private final String name;
private final MsgPackProcessor<M> msgPackProcessor; private final MsgPackProcessor<M> msgPackProcessor;
private final long pollInterval; private final long pollInterval;
private final ExecutorService consumerExecutor; private final ExecutorService consumerExecutor;
private final String threadPrefix;
@Getter @Getter
private final TbQueueConsumer<M> consumer; private final TbQueueConsumer<M> consumer;
private volatile boolean stopped; private volatile boolean stopped;
@Builder @Builder
public QueueConsumerManager(String key, String name, MsgPackProcessor<M> msgPackProcessor, public QueueConsumerManager(String name, MsgPackProcessor<M> msgPackProcessor,
long pollInterval, Supplier<TbQueueConsumer<M>> consumerCreator, long pollInterval, Supplier<TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor) { ExecutorService consumerExecutor, String threadPrefix) {
this.key = key;
this.name = name; this.name = name;
this.pollInterval = pollInterval; this.pollInterval = pollInterval;
this.msgPackProcessor = msgPackProcessor; this.msgPackProcessor = msgPackProcessor;
this.consumerExecutor = consumerExecutor; this.consumerExecutor = consumerExecutor;
this.threadPrefix = threadPrefix;
this.consumer = consumerCreator.get(); this.consumer = consumerCreator.get();
} }
@ -64,7 +64,9 @@ public class QueueConsumerManager<M extends TbQueueMsg> {
public void launch() { public void launch() {
log.info("[{}] Launching consumer", name); log.info("[{}] Launching consumer", name);
consumerExecutor.submit(() -> { consumerExecutor.submit(() -> {
ThingsBoardThreadFactory.addThreadNamePrefix(key); if (threadPrefix != null) {
ThingsBoardThreadFactory.addThreadNamePrefix(threadPrefix);
}
try { try {
consumerLoop(consumer); consumerLoop(consumer);
} catch (Throwable e) { } catch (Throwable e) {
@ -104,4 +106,5 @@ public class QueueConsumerManager<M extends TbQueueMsg> {
public interface MsgPackProcessor<M extends TbQueueMsg> { public interface MsgPackProcessor<M extends TbQueueMsg> {
void process(List<M> msgs, TbQueueConsumer<M> consumer) throws Exception; void process(List<M> msgs, TbQueueConsumer<M> consumer) throws Exception;
} }
} }

View File

@ -232,9 +232,8 @@ public class DefaultTransportService extends TransportActivityManager implements
ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer(); ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
transportApiRequestTemplate.init(); transportApiRequestTemplate.init();
consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("consumer")); consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder() transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder()
.key("transport")
.name("TB Transport") .name("TB Transport")
.msgPackProcessor(this::processNotificationMsgs) .msgPackProcessor(this::processNotificationMsgs)
.pollInterval(notificationsPollDuration) .pollInterval(notificationsPollDuration)

View File

@ -132,14 +132,13 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
@PostConstruct @PostConstruct
public void init() { public void init() {
consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("consumer")); consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("vc-consumer"));
var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread"); var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread");
for (int i = 0; i < ioPoolSize; i++) { for (int i = 0; i < ioPoolSize; i++) {
ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory))); ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory)));
} }
producer = producerProvider.getTbCoreNotificationsMsgProducer(); producer = producerProvider.getTbCoreNotificationsMsgProducer();
consumer = QueueConsumerManager.<TbProtoQueueMsg<ToVersionControlServiceMsg>>builder() consumer = QueueConsumerManager.<TbProtoQueueMsg<ToVersionControlServiceMsg>>builder()
.key("vc")
.name("TB Version Control") .name("TB Version Control")
.msgPackProcessor(this::processMsgs) .msgPackProcessor(this::processMsgs)
.pollInterval(pollDuration) .pollInterval(pollDuration)