Fix createEdgeEventMsgConsumer
This commit is contained in:
parent
42ba543fb0
commit
dc680ab007
@ -494,7 +494,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||
consumerBuilder.settings(kafkaSettings);
|
||||
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
|
||||
consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic());
|
||||
consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
|
||||
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
|
||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
|
||||
@ -440,7 +440,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||
consumerBuilder.settings(kafkaSettings);
|
||||
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
|
||||
consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic());
|
||||
consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
|
||||
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
|
||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user