diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 8b0f6ad6b4..64c5d03dce 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.gen.transport.TransportProtos; @@ -64,12 +65,14 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Service @@ -79,12 +82,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< public static final String SUCCESSFUL_STATUS = "successful"; public static final String FAILED_STATUS = "failed"; + public static final String THREAD_TOPIC_SPLITERATOR = " | "; @Value("${queue.rule-engine.poll-interval}") private long pollDuration; @Value("${queue.rule-engine.pack-processing-timeout}") private long packProcessingTimeout; @Value("${queue.rule-engine.stats.enabled:true}") private boolean statsEnabled; + @Value("${queue.rule-engine.prometheus-stats.enabled:false}") + boolean prometheusStatsEnabled; private final StatsFactory statsFactory; private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory; @@ -96,7 +102,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); - private ExecutorService submitExecutor; + private final ConcurrentMap topicsConsumerPerPartition = new ConcurrentHashMap<>(); + final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor")); + final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition-executor")); public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory, TbRuleEngineSubmitStrategyFactory submitStrategyFactory, @@ -125,10 +133,13 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer"); for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) { consumerConfigurations.putIfAbsent(configuration.getName(), configuration); - consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory)); + if (!configuration.isConsumerPerPartition()) { + consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); + } else { + topicsConsumerPerPartition.computeIfAbsent(configuration.getName(), TbTopicWithConsumerPerPartition::new); + } } - submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor")); } @PreDestroy @@ -145,96 +156,186 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (event.getServiceType().equals(getServiceType())) { ServiceQueue serviceQueue = event.getServiceQueueKey().getServiceQueue(); log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), event.getPartitions()); - consumers.get(serviceQueue.getQueue()).subscribe(event.getPartitions()); + if (!consumerConfigurations.get(serviceQueue.getQueue()).isConsumerPerPartition()) { + consumers.get(serviceQueue.getQueue()).subscribe(event.getPartitions()); + } else { + log.info("[{}] Subscribing consumer per partition: {}", serviceQueue.getQueue(), event.getPartitions()); + subscribeConsumerPerPartition(serviceQueue.getQueue(), event.getPartitions()); + } } } + void subscribeConsumerPerPartition(String queue, Set partitions) { + topicsConsumerPerPartition.get(queue).getSubscribeQueue().add(partitions); + scheduleTopicRepartition(queue); + } + + private void scheduleTopicRepartition(String queue) { + repartitionExecutor.schedule(() -> repartitionTopicWithConsumerPerPartition(queue), 1, TimeUnit.SECONDS); + } + + void repartitionTopicWithConsumerPerPartition(final String queue) { + if (stopped) { + return; + } + TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queue); + Queue> subscribeQueue = tbTopicWithConsumerPerPartition.getSubscribeQueue(); + if (subscribeQueue.isEmpty()) { + return; + } + if (tbTopicWithConsumerPerPartition.getLock().tryLock()) { + try { + Set partitions = null; + while (!subscribeQueue.isEmpty()) { + partitions = subscribeQueue.poll(); + } + if (partitions == null) { + return; + } + + Set addedPartitions = new HashSet<>(partitions); + ConcurrentMap>> consumers = tbTopicWithConsumerPerPartition.getConsumers(); + addedPartitions.removeAll(consumers.keySet()); + log.info("calculated addedPartitions {}", addedPartitions); + + Set removedPartitions = new HashSet<>(consumers.keySet()); + removedPartitions.removeAll(partitions); + log.info("calculated removedPartitions {}", removedPartitions); + + removedPartitions.forEach((tpi) -> { + removeConsumerForTopicByTpi(queue, consumers, tpi); + }); + + addedPartitions.forEach((tpi) -> { + log.info("[{}] Adding consumer for topic: {}", queue, tpi); + TbRuleEngineQueueConfiguration configuration = consumerConfigurations.get(queue); + //consumerStats.computeIfAbsent(queue, queueName -> new TbRuleEngineConsumerStats(configuration.getName(), statsFactory)); //already created on init + TbQueueConsumer> consumer = tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration); + consumers.put(tpi, consumer); + launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue), "" + queue + "-" + tpi.getPartition().orElse(-999999)); + consumer.subscribe(Collections.singleton(tpi)); + }); + + } finally { + tbTopicWithConsumerPerPartition.getLock().unlock(); + } + } else { + scheduleTopicRepartition(queue); //reschedule later + } + + } + + void removeConsumerForTopicByTpi(String queue, ConcurrentMap>> consumers, TopicPartitionInfo tpi) { + log.info("[{}] Removing consumer for topic: {}", queue, tpi); + consumers.get(tpi).unsubscribe(); + consumers.remove(tpi); + } + @Override protected void launchMainConsumers() { - consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue))); + consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue), queue)); } @Override protected void stopMainConsumers() { consumers.values().forEach(TbQueueConsumer::unsubscribe); + topicsConsumerPerPartition.values().forEach(tbTopicWithConsumerPerPartition -> tbTopicWithConsumerPerPartition.getConsumers().keySet() + .forEach((tpi)-> removeConsumerForTopicByTpi(tbTopicWithConsumerPerPartition.getTopic(), tbTopicWithConsumerPerPartition.getConsumers(), tpi))); } - private void launchConsumer(TbQueueConsumer> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats) { - consumersExecutor.execute(() -> { - Thread.currentThread().setName("" + Thread.currentThread().getName() + "-" + configuration.getName()); - while (!stopped) { - try { - List> msgs = consumer.poll(pollDuration); - if (msgs.isEmpty()) { - continue; + void launchConsumer(TbQueueConsumer> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { + consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + } + + void consumerLoop(TbQueueConsumer> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { + updateCurrentThreadName(threadSuffix); + while (!stopped && !consumer.isStopped()) { + try { + List> msgs = consumer.poll(pollDuration); + if (msgs.isEmpty()) { + continue; + } + final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration); + final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration); + submitStrategy.init(msgs); + while (!stopped) { + TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy); + submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg))); + + final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS); + + TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx); + if (timeout) { + printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout"); } - TbRuleEngineSubmitStrategy submitStrategy = submitStrategyFactory.newInstance(configuration.getName(), configuration.getSubmitStrategy()); - TbRuleEngineProcessingStrategy ackStrategy = processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy()); - - submitStrategy.init(msgs); - - while (!stopped) { - TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy); - submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> { - log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); - ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); - TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); - TbMsgCallback callback = statsEnabled ? - new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : - new TbMsgPackCallback(id, tenantId, ctx); - try { - if (!toRuleEngineMsg.getTbMsg().isEmpty()) { - forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback); - } else { - callback.onSuccess(); - } - } catch (Exception e) { - callback.onFailure(new RuleEngineException(e.getMessage())); - } - })); - - boolean timeout = false; - if (!ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS)) { - timeout = true; - } - - TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx); - if (timeout) { - printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout"); - } - if (!ctx.getFailedMap().isEmpty()) { - printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); - } - ctx.printProfilerStats(); - - TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); - if (statsEnabled) { - stats.log(result, decision.isCommit()); - } - - ctx.cleanup(); - - if (decision.isCommit()) { - submitStrategy.stop(); - break; - } else { - submitStrategy.update(decision.getReprocessMap()); - } + if (!ctx.getFailedMap().isEmpty()) { + printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); } - consumer.commit(); - } catch (Exception e) { - if (!stopped) { - log.warn("Failed to process messages from queue.", e); - try { - Thread.sleep(pollDuration); - } catch (InterruptedException e2) { - log.trace("Failed to wait until the server has capacity to handle new requests", e2); - } + ctx.printProfilerStats(); + + TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); + if (statsEnabled) { + stats.log(result, decision.isCommit()); + } + + ctx.cleanup(); + + if (decision.isCommit()) { + submitStrategy.stop(); + break; + } else { + submitStrategy.update(decision.getReprocessMap()); + } + } + consumer.commit(); + } catch (Exception e) { + if (!stopped) { + log.warn("Failed to process messages from queue.", e); + try { + Thread.sleep(pollDuration); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); } } } - log.info("TB Rule Engine Consumer stopped."); - }); + } + log.info("TB Rule Engine Consumer stopped."); + } + + void updateCurrentThreadName(String threadSuffix) { + String name = Thread.currentThread().getName(); + int spliteratorIndex = name.indexOf(THREAD_TOPIC_SPLITERATOR); + if (spliteratorIndex > 0) { + name = name.substring(0, spliteratorIndex); + } + name = name + THREAD_TOPIC_SPLITERATOR + threadSuffix; + Thread.currentThread().setName(name); + } + + TbRuleEngineProcessingStrategy getAckStrategy(TbRuleEngineQueueConfiguration configuration) { + return processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy()); + } + + TbRuleEngineSubmitStrategy getSubmitStrategy(TbRuleEngineQueueConfiguration configuration) { + return submitStrategyFactory.newInstance(configuration.getName(), configuration.getSubmitStrategy()); + } + + void submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg msg) { + log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue()); + ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); + TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); + TbMsgCallback callback = prometheusStatsEnabled ? + new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : + new TbMsgPackCallback(id, tenantId, ctx); + try { + if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { + forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback); + } else { + callback.onSuccess(); + } + } catch (Exception e) { + callback.onFailure(new RuleEngineException(e.getMessage())); + } } private void printFirstOrAll(TbRuleEngineQueueConfiguration configuration, TbMsgPackProcessingContext ctx, Map> map, String prefix) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbTopicWithConsumerPerPartition.java b/application/src/main/java/org/thingsboard/server/service/queue/TbTopicWithConsumerPerPartition.java new file mode 100644 index 0000000000..ce6cacf17d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbTopicWithConsumerPerPartition.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; + +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; + +@RequiredArgsConstructor +@Data +public class TbTopicWithConsumerPerPartition { + private final String topic; + @Getter + private final ReentrantLock lock = new ReentrantLock(); //NonfairSync + private volatile Set partitions = Collections.emptySet(); + private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); + private final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 37b8fc1003..ddce060f43 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -866,6 +866,7 @@ queue: topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}" poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" + consumer-per-partition: "${TB_QUEUE_RE_MAIN_CONSUMER_PER_PARTITION:false}" pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}" submit-strategy: type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL @@ -882,6 +883,7 @@ queue: topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}" + consumer-per-partition: "${TB_QUEUE_RE_HP_CONSUMER_PER_PARTITION:false}" pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}" submit-strategy: type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL @@ -898,6 +900,7 @@ queue: topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}" + consumer-per-partition: "${TB_QUEUE_RE_SQ_CONSUMER_PER_PARTITION:false}" pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}" submit-strategy: type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java index fd5e84325a..b5acd94f63 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java @@ -34,4 +34,6 @@ public interface TbQueueConsumer { void commit(); + boolean isStopped(); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index fc3ddf22ff..a081dcd95d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -172,6 +172,11 @@ public abstract class AbstractTbQueueConsumerTemplate i } } + @Override + public boolean isStopped() { + return stopped; + } + abstract protected List doPoll(long durationInMillis); abstract protected T decode(R record) throws IOException; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java index 6e28d48a4b..0d6d4bf729 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java @@ -96,4 +96,10 @@ public class InMemoryTbQueueConsumer implements TbQueueCon @Override public void commit() { } + + @Override + public boolean isStopped() { + return stopped; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index c57260bef8..3bb0a53ad1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -54,6 +54,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicLong; @Component @ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='monolith'") @@ -75,6 +76,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin transportApiAdmin; private final TbQueueAdmin notificationAdmin; private final TbQueueAdmin fwUpdatesAdmin; + private final AtomicLong consumerCount = new AtomicLong(); public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, @@ -159,7 +161,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(ruleEngineSettings.getTopic()); - consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index a8247dc11e..de6ccc1bba 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import javax.annotation.PreDestroy; import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicLong; @Component @ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-rule-engine'") @@ -66,6 +67,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin ruleEngineAdmin; private final TbQueueAdmin jsExecutorAdmin; private final TbQueueAdmin notificationAdmin; + private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, @@ -145,7 +147,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(ruleEngineSettings.getTopic()); - consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId()); + consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); consumerBuilder.groupId("re-" + queueName + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java index a07380d489..0d90f8cd54 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbRuleEngineQueueConfiguration.java @@ -24,6 +24,7 @@ public class TbRuleEngineQueueConfiguration { private String topic; private int pollInterval; private int partitions; + private boolean consumerPerPartition; private long packProcessingTimeout; private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy; private TbRuleEngineQueueAckStrategyConfiguration processingStrategy;