Minor refactoring
This commit is contained in:
parent
c56132d6e8
commit
92e0c65a37
@ -142,8 +142,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
|
||||
private final TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
|
||||
|
||||
protected volatile ExecutorService consumersExecutor;
|
||||
protected volatile ExecutorService usageStatsExecutor;
|
||||
|
||||
private volatile ExecutorService firmwareStatesExecutor;
|
||||
|
||||
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
|
||||
@ -186,7 +186,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.init("tb-core-consumer", "tb-core-notifications-consumer");
|
||||
super.init("tb-core-notifications-consumer");
|
||||
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-consumer"));
|
||||
this.usageStatsExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
|
||||
this.firmwareStatesExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
|
||||
}
|
||||
@ -194,6 +195,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
super.destroy();
|
||||
if (consumersExecutor != null) {
|
||||
consumersExecutor.shutdownNow();
|
||||
}
|
||||
if (usageStatsExecutor != null) {
|
||||
usageStatsExecutor.shutdownNow();
|
||||
}
|
||||
@ -681,7 +685,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopMainConsumers() {
|
||||
protected void stopConsumers() {
|
||||
if (mainConsumer != null) {
|
||||
mainConsumer.unsubscribe();
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.service.queue;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -27,6 +28,7 @@ import org.thingsboard.server.common.data.rpc.RpcError;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
@ -35,6 +37,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
|
||||
import org.thingsboard.server.queue.util.AfterStartUp;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
@ -59,6 +62,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||
public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
|
||||
|
||||
private final TbRuleEngineConsumerContext ctx;
|
||||
private final QueueService queueService;
|
||||
private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
|
||||
|
||||
private final ConcurrentMap<QueueKey, TbRuleEngineQueueConsumerManager> consumers = new ConcurrentHashMap<>();
|
||||
@ -68,6 +72,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
ActorSystemContext actorContext,
|
||||
DataDecodingEncodingService encodingService,
|
||||
TbRuleEngineDeviceRpcService tbDeviceRpcService,
|
||||
QueueService queueService,
|
||||
TbDeviceProfileCache deviceProfileCache,
|
||||
TbAssetProfileCache assetProfileCache,
|
||||
TbTenantProfileCache tenantProfileCache,
|
||||
@ -77,12 +82,13 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty());
|
||||
this.ctx = ctx;
|
||||
this.tbDeviceRpcService = tbDeviceRpcService;
|
||||
this.queueService = queueService;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.init("tb-rule-engine-notifications-consumer"); // TODO: restore init of the main consumer?
|
||||
List<Queue> queues = ctx.findAllQueues();
|
||||
super.init("tb-rule-engine-notifications-consumer");
|
||||
List<Queue> queues = queueService.findAllQueues();
|
||||
for (Queue configuration : queues) {
|
||||
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
|
||||
initConsumer(configuration);
|
||||
@ -91,30 +97,38 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
}
|
||||
|
||||
private void initConsumer(Queue configuration) {
|
||||
consumers.computeIfAbsent(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration),
|
||||
key -> new TbRuleEngineQueueConsumerManager(ctx, key)).init(configuration);
|
||||
getOrCreateConsumer(new QueueKey(ServiceType.TB_RULE_ENGINE, configuration)).init(configuration);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||
if (event.getServiceType().equals(getServiceType())) {
|
||||
var consumer = consumers.get(event.getQueueKey());
|
||||
if (consumer != null) {
|
||||
consumer.subscribe(event);
|
||||
} else {
|
||||
log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
|
||||
}
|
||||
event.getPartitionsMap().forEach((queueKey, partitions) -> {
|
||||
var consumer = consumers.get(queueKey);
|
||||
if (consumer != null) {
|
||||
consumer.update(partitions);
|
||||
} else {
|
||||
log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
|
||||
public void onApplicationEvent(ApplicationReadyEvent event) {
|
||||
super.onApplicationEvent(event);
|
||||
ctx.setReady(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void launchMainConsumers() {
|
||||
consumers.values().forEach(TbRuleEngineQueueConsumerManager::launchMainConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopMainConsumers() {
|
||||
protected void stopConsumers() {
|
||||
consumers.values().forEach(TbRuleEngineQueueConsumerManager::stop);
|
||||
ctx.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -164,8 +178,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
|
||||
String queueName = queueUpdateMsg.getQueueName();
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
|
||||
Queue queue = ctx.getQueueService().findQueueById(tenantId, queueId);
|
||||
consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)).update(queue);
|
||||
Queue queue = queueService.findQueueById(tenantId, queueId);
|
||||
|
||||
TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey);
|
||||
Queue oldQueue = consumerManager.getQueue();
|
||||
consumerManager.update(queue);
|
||||
|
||||
if (oldQueue != null && queue.getPartitions() == oldQueue.getPartitions()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
partitionService.updateQueue(queueUpdateMsg);
|
||||
@ -177,15 +198,19 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
log.info("Received queue delete msg: [{}]", queueDeleteMsg);
|
||||
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||
var consumerManager = consumers.remove(queueKey);
|
||||
if (consumerManager != null) {
|
||||
consumerManager.delete();
|
||||
}
|
||||
|
||||
partitionService.removeQueue(queueDeleteMsg);
|
||||
var manager = consumers.remove(queueKey);
|
||||
if (manager != null) {
|
||||
manager.delete();
|
||||
}
|
||||
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
||||
}
|
||||
|
||||
private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) {
|
||||
return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key));
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}")
|
||||
public void printStats() {
|
||||
if (ctx.isStatsEnabled()) {
|
||||
|
||||
@ -67,9 +67,9 @@ public class TbRuleEngineConsumerStats {
|
||||
private final String queueName;
|
||||
private final TenantId tenantId;
|
||||
|
||||
public TbRuleEngineConsumerStats(QueueKey queue, StatsFactory statsFactory) {
|
||||
this.queueName = queue.getQueueName();
|
||||
this.tenantId = queue.getTenantId();
|
||||
public TbRuleEngineConsumerStats(QueueKey queueKey, StatsFactory statsFactory) {
|
||||
this.queueName = queueKey.getQueueName();
|
||||
this.tenantId = queueKey.getTenantId();
|
||||
this.statsFactory = statsFactory;
|
||||
|
||||
String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
|
||||
|
||||
@ -65,7 +65,6 @@ import java.util.stream.Collectors;
|
||||
@Slf4j
|
||||
public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
|
||||
|
||||
protected volatile ExecutorService consumersExecutor;
|
||||
protected volatile ExecutorService notificationsConsumerExecutor;
|
||||
protected volatile boolean stopped = false;
|
||||
protected volatile boolean isReady = false;
|
||||
@ -99,8 +98,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
this.jwtSettingsService = jwtSettingsService;
|
||||
}
|
||||
|
||||
public void init(String mainConsumerThreadName, String nfConsumerThreadName) {
|
||||
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(mainConsumerThreadName));
|
||||
public void init(String nfConsumerThreadName) {
|
||||
this.notificationsConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(nfConsumerThreadName));
|
||||
}
|
||||
|
||||
@ -117,7 +115,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
|
||||
protected abstract void launchMainConsumers();
|
||||
|
||||
protected abstract void stopMainConsumers();
|
||||
protected abstract void stopConsumers();
|
||||
|
||||
protected abstract long getNotificationPollDuration();
|
||||
|
||||
@ -222,13 +220,10 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
stopped = true;
|
||||
stopMainConsumers();
|
||||
stopConsumers();
|
||||
if (nfConsumer != null) {
|
||||
nfConsumer.unsubscribe();
|
||||
}
|
||||
if (consumersExecutor != null) {
|
||||
consumersExecutor.shutdownNow();
|
||||
}
|
||||
if (notificationsConsumerExecutor != null) {
|
||||
notificationsConsumerExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
@ -6,24 +6,22 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
|
||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
|
||||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
|
||||
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
|
||||
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
|
||||
import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.List;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@TbRuleEngineComponent
|
||||
@ -41,10 +39,8 @@ public class TbRuleEngineConsumerContext {
|
||||
boolean prometheusStatsEnabled;
|
||||
@Value("${queue.rule-engine.topic-deletion-delay:30}")
|
||||
private int topicDeletionDelayInSec;
|
||||
|
||||
//TODO: check if they are set correctly.
|
||||
protected volatile boolean stopped = false;
|
||||
protected volatile boolean isReady = false;
|
||||
@Value("${queue.rule-engine.management-thread-pool-size:12}")
|
||||
private int mgmtThreadPoolSize;
|
||||
|
||||
private final ActorSystemContext actorContext;
|
||||
private final StatsFactory statsFactory;
|
||||
@ -53,26 +49,31 @@ public class TbRuleEngineConsumerContext {
|
||||
private final TbRuleEngineQueueFactory queueFactory;
|
||||
private final RuleEngineStatisticsService statisticsService;
|
||||
private final TbServiceInfoProvider serviceInfoProvider;
|
||||
private final QueueService queueService;
|
||||
private final PartitionService partitionService;
|
||||
private final TbQueueProducerProvider producerProvider;
|
||||
private final TbQueueAdmin queueAdmin;
|
||||
|
||||
//TODO: add reasonable limit for mgmt pool.
|
||||
private final ExecutorService mgmtExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-mgmt"));
|
||||
private final ExecutorService consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
|
||||
//TODO: do we actually need this?
|
||||
private final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-submit"));
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler"));
|
||||
private ExecutorService consumersExecutor;
|
||||
private ExecutorService mgmtExecutor;
|
||||
private ScheduledExecutorService scheduler;
|
||||
|
||||
public List<Queue> findAllQueues() {
|
||||
return queueService.findAllQueues();
|
||||
private volatile boolean isReady = false;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
|
||||
this.mgmtExecutor = Executors.newFixedThreadPool(mgmtThreadPoolSize, ThingsBoardThreadFactory.forName("tb-rule-engine-mgmt"));
|
||||
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
mgmtExecutor.shutdownNow();
|
||||
consumersExecutor.shutdownNow(); // TODO: shutdown or shutdownNow?
|
||||
submitExecutor.shutdownNow();
|
||||
scheduler.shutdownNow();
|
||||
consumersExecutor.shutdown();
|
||||
mgmtExecutor.shutdown();
|
||||
try {
|
||||
mgmtExecutor.awaitTermination(15, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Failed to await mgmtExecutor termination");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1303,7 +1303,9 @@ queue:
|
||||
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries;
|
||||
max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries.
|
||||
# After a queue is deleted (or profile's isolation option was disabled), Rule Engine will continue reading related topics during this period, before deleting the actual topics
|
||||
topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:30}"
|
||||
topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}"
|
||||
# Size of the thread pool that handles management operations such as subscribe, unsubscribe, queue delete, etc.
|
||||
management-thread-pool-size: "${TB_QUEUE_RULE_ENGINE_MGMT_THREAD_POOL_SIZE:12}"
|
||||
transport:
|
||||
# For high priority notifications that require minimum latency and processing time
|
||||
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
|
||||
|
||||
@ -37,12 +37,14 @@ import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategy;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
|
||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
|
||||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
|
||||
import org.thingsboard.server.service.stats.DefaultRuleEngineStatisticsService;
|
||||
@ -163,7 +165,7 @@ public class BaseQueueControllerTest extends AbstractControllerTest {
|
||||
tenantId, ruleEngineException
|
||||
)));
|
||||
|
||||
TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(queue, statsFactory);
|
||||
TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(new QueueKey(ServiceType.TB_RULE_ENGINE, queue), statsFactory);
|
||||
testStats.log(testProcessingResult, true);
|
||||
|
||||
int queueStatsTtlDays = 14;
|
||||
@ -215,7 +217,7 @@ public class BaseQueueControllerTest extends AbstractControllerTest {
|
||||
tenantId, ruleEngineException
|
||||
)));
|
||||
|
||||
TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(queue, statsFactory);
|
||||
TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(new QueueKey(ServiceType.TB_RULE_ENGINE, queue), statsFactory);
|
||||
testStats.log(testProcessingResult, true);
|
||||
ruleEngineStatisticsService.reportQueueStats(System.currentTimeMillis(), testStats);
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user