Merge branch 'feature/rule-engine-thread-per-topic-partition' of https://github.com/smatvienko-tb/thingsboard

This commit is contained in:
Andrii Shvaika 2021-07-06 13:23:20 +03:00
commit 026329f812
9 changed files with 245 additions and 78 deletions

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -64,12 +65,14 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
@ -79,12 +82,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
public static final String SUCCESSFUL_STATUS = "successful";
public static final String FAILED_STATUS = "failed";
public static final String THREAD_TOPIC_SPLITERATOR = " | ";
@Value("${queue.rule-engine.poll-interval}")
private long pollDuration;
@Value("${queue.rule-engine.pack-processing-timeout}")
private long packProcessingTimeout;
@Value("${queue.rule-engine.stats.enabled:true}")
private boolean statsEnabled;
@Value("${queue.rule-engine.prometheus-stats.enabled:false}")
boolean prometheusStatsEnabled;
private final StatsFactory statsFactory;
private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
@ -96,7 +102,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
private ExecutorService submitExecutor;
private final ConcurrentMap<String, TbTopicWithConsumerPerPartition> topicsConsumerPerPartition = new ConcurrentHashMap<>();
final ExecutorService submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor"));
final ScheduledExecutorService repartitionExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-repartition-executor"));
public DefaultTbRuleEngineConsumerService(TbRuleEngineProcessingStrategyFactory processingStrategyFactory,
TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
@ -125,10 +133,13 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer");
for (TbRuleEngineQueueConfiguration configuration : ruleEngineSettings.getQueues()) {
consumerConfigurations.putIfAbsent(configuration.getName(), configuration);
consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
consumerStats.put(configuration.getName(), new TbRuleEngineConsumerStats(configuration.getName(), statsFactory));
if (!configuration.isConsumerPerPartition()) {
consumers.computeIfAbsent(configuration.getName(), queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
} else {
topicsConsumerPerPartition.computeIfAbsent(configuration.getName(), TbTopicWithConsumerPerPartition::new);
}
}
submitExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-service-submit-executor"));
}
@PreDestroy
@ -145,96 +156,186 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (event.getServiceType().equals(getServiceType())) {
ServiceQueue serviceQueue = event.getServiceQueueKey().getServiceQueue();
log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), event.getPartitions());
consumers.get(serviceQueue.getQueue()).subscribe(event.getPartitions());
if (!consumerConfigurations.get(serviceQueue.getQueue()).isConsumerPerPartition()) {
consumers.get(serviceQueue.getQueue()).subscribe(event.getPartitions());
} else {
log.info("[{}] Subscribing consumer per partition: {}", serviceQueue.getQueue(), event.getPartitions());
subscribeConsumerPerPartition(serviceQueue.getQueue(), event.getPartitions());
}
}
}
void subscribeConsumerPerPartition(String queue, Set<TopicPartitionInfo> partitions) {
topicsConsumerPerPartition.get(queue).getSubscribeQueue().add(partitions);
scheduleTopicRepartition(queue);
}
private void scheduleTopicRepartition(String queue) {
repartitionExecutor.schedule(() -> repartitionTopicWithConsumerPerPartition(queue), 1, TimeUnit.SECONDS);
}
void repartitionTopicWithConsumerPerPartition(final String queue) {
if (stopped) {
return;
}
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.get(queue);
Queue<Set<TopicPartitionInfo>> subscribeQueue = tbTopicWithConsumerPerPartition.getSubscribeQueue();
if (subscribeQueue.isEmpty()) {
return;
}
if (tbTopicWithConsumerPerPartition.getLock().tryLock()) {
try {
Set<TopicPartitionInfo> partitions = null;
while (!subscribeQueue.isEmpty()) {
partitions = subscribeQueue.poll();
}
if (partitions == null) {
return;
}
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = tbTopicWithConsumerPerPartition.getConsumers();
addedPartitions.removeAll(consumers.keySet());
log.info("calculated addedPartitions {}", addedPartitions);
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("calculated removedPartitions {}", removedPartitions);
removedPartitions.forEach((tpi) -> {
removeConsumerForTopicByTpi(queue, consumers, tpi);
});
addedPartitions.forEach((tpi) -> {
log.info("[{}] Adding consumer for topic: {}", queue, tpi);
TbRuleEngineQueueConfiguration configuration = consumerConfigurations.get(queue);
//consumerStats.computeIfAbsent(queue, queueName -> new TbRuleEngineConsumerStats(configuration.getName(), statsFactory)); //already created on init
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration);
consumers.put(tpi, consumer);
launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue), "" + queue + "-" + tpi.getPartition().orElse(-999999));
consumer.subscribe(Collections.singleton(tpi));
});
} finally {
tbTopicWithConsumerPerPartition.getLock().unlock();
}
} else {
scheduleTopicRepartition(queue); //reschedule later
}
}
void removeConsumerForTopicByTpi(String queue, ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers, TopicPartitionInfo tpi) {
log.info("[{}] Removing consumer for topic: {}", queue, tpi);
consumers.get(tpi).unsubscribe();
consumers.remove(tpi);
}
@Override
protected void launchMainConsumers() {
consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue)));
consumers.forEach((queue, consumer) -> launchConsumer(consumer, consumerConfigurations.get(queue), consumerStats.get(queue), queue));
}
@Override
protected void stopMainConsumers() {
consumers.values().forEach(TbQueueConsumer::unsubscribe);
topicsConsumerPerPartition.values().forEach(tbTopicWithConsumerPerPartition -> tbTopicWithConsumerPerPartition.getConsumers().keySet()
.forEach((tpi)-> removeConsumerForTopicByTpi(tbTopicWithConsumerPerPartition.getTopic(), tbTopicWithConsumerPerPartition.getConsumers(), tpi)));
}
private void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats) {
consumersExecutor.execute(() -> {
Thread.currentThread().setName("" + Thread.currentThread().getName() + "-" + configuration.getName());
while (!stopped) {
try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
void launchConsumer(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
}
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
updateCurrentThreadName(threadSuffix);
while (!stopped && !consumer.isStopped()) {
try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);
final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);
submitStrategy.init(msgs);
while (!stopped) {
TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy);
submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));
final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
if (timeout) {
printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
}
TbRuleEngineSubmitStrategy submitStrategy = submitStrategyFactory.newInstance(configuration.getName(), configuration.getSubmitStrategy());
TbRuleEngineProcessingStrategy ackStrategy = processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy());
submitStrategy.init(msgs);
while (!stopped) {
TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy);
submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> {
log.trace("[{}] Creating callback for message: {}", id, msg.getValue());
ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
TbMsgCallback callback = statsEnabled ?
new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
new TbMsgPackCallback(id, tenantId, ctx);
try {
if (!toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
}
} catch (Exception e) {
callback.onFailure(new RuleEngineException(e.getMessage()));
}
}));
boolean timeout = false;
if (!ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
timeout = true;
}
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
if (timeout) {
printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
}
if (!ctx.getFailedMap().isEmpty()) {
printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
}
ctx.printProfilerStats();
TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
if (statsEnabled) {
stats.log(result, decision.isCommit());
}
ctx.cleanup();
if (decision.isCommit()) {
submitStrategy.stop();
break;
} else {
submitStrategy.update(decision.getReprocessMap());
}
if (!ctx.getFailedMap().isEmpty()) {
printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
}
consumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to process messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
ctx.printProfilerStats();
TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
if (statsEnabled) {
stats.log(result, decision.isCommit());
}
ctx.cleanup();
if (decision.isCommit()) {
submitStrategy.stop();
break;
} else {
submitStrategy.update(decision.getReprocessMap());
}
}
consumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to process messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
log.info("TB Rule Engine Consumer stopped.");
});
}
log.info("TB Rule Engine Consumer stopped.");
}
void updateCurrentThreadName(String threadSuffix) {
String name = Thread.currentThread().getName();
int spliteratorIndex = name.indexOf(THREAD_TOPIC_SPLITERATOR);
if (spliteratorIndex > 0) {
name = name.substring(0, spliteratorIndex);
}
name = name + THREAD_TOPIC_SPLITERATOR + threadSuffix;
Thread.currentThread().setName(name);
}
TbRuleEngineProcessingStrategy getAckStrategy(TbRuleEngineQueueConfiguration configuration) {
return processingStrategyFactory.newInstance(configuration.getName(), configuration.getProcessingStrategy());
}
TbRuleEngineSubmitStrategy getSubmitStrategy(TbRuleEngineQueueConfiguration configuration) {
return submitStrategyFactory.newInstance(configuration.getName(), configuration.getSubmitStrategy());
}
void submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) {
log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue());
ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
TenantId tenantId = new TenantId(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
TbMsgCallback callback = prometheusStatsEnabled ?
new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) :
new TbMsgPackCallback(id, tenantId, ctx);
try {
if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
}
} catch (Exception e) {
callback.onFailure(new RuleEngineException(e.getMessage()));
}
}
private void printFirstOrAll(TbRuleEngineQueueConfiguration configuration, TbMsgPackProcessingContext ctx, Map<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> map, String prefix) {

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
@RequiredArgsConstructor
@Data
public class TbTopicWithConsumerPerPartition {
private final String topic;
@Getter
private final ReentrantLock lock = new ReentrantLock(); //NonfairSync
private volatile Set<TopicPartitionInfo> partitions = Collections.emptySet();
private final ConcurrentMap<TopicPartitionInfo, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
private final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
}

View File

@ -866,6 +866,7 @@ queue:
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
consumer-per-partition: "${TB_QUEUE_RE_MAIN_CONSUMER_PER_PARTITION:false}"
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}"
submit-strategy:
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
@ -882,6 +883,7 @@ queue:
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}"
consumer-per-partition: "${TB_QUEUE_RE_HP_CONSUMER_PER_PARTITION:false}"
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}"
submit-strategy:
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
@ -898,6 +900,7 @@ queue:
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}"
consumer-per-partition: "${TB_QUEUE_RE_SQ_CONSUMER_PER_PARTITION:false}"
pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}"
submit-strategy:
type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL

View File

@ -34,4 +34,6 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
void commit();
boolean isStopped();
}

View File

@ -172,6 +172,11 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
}
}
@Override
public boolean isStopped() {
return stopped;
}
abstract protected List<R> doPoll(long durationInMillis);
abstract protected T decode(R record) throws IOException;

View File

@ -96,4 +96,10 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
@Override
public void commit() {
}
@Override
public boolean isStopped() {
return stopped;
}
}

View File

@ -54,6 +54,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='monolith'")
@ -75,6 +76,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin fwUpdatesAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider,
@ -159,7 +161,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-rule-engine'")
@ -66,6 +67,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin notificationAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider,
@ -145,7 +147,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(ruleEngineSettings.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);

View File

@ -24,6 +24,7 @@ public class TbRuleEngineQueueConfiguration {
private String topic;
private int pollInterval;
private int partitions;
private boolean consumerPerPartition;
private long packProcessingTimeout;
private TbRuleEngineQueueSubmitStrategyConfiguration submitStrategy;
private TbRuleEngineQueueAckStrategyConfiguration processingStrategy;