Merge pull request #13176 from volodymyr-babak/edge-fix-rebalance-too-often
Implement Unique Consumer Groups for Edge Notifications to Improve Stability
This commit is contained in:
commit
0f51c41be6
@ -141,6 +141,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
|
|||||||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic();
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic();
|
||||||
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
|
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
|
||||||
kafkaAdmin.deleteTopic(topic);
|
kafkaAdmin.deleteTopic(topic);
|
||||||
|
kafkaAdmin.deleteConsumerGroup(topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -113,7 +113,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
.ifPresentOrElse(lastConnectTime -> {
|
.ifPresentOrElse(lastConnectTime -> {
|
||||||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
||||||
if (kafkaAdmin.isTopicEmpty(topic)) {
|
if (kafkaAdmin.isTopicEmpty(topic)) {
|
||||||
kafkaAdmin.deleteTopic(topic);
|
deleteTopicAndConsumerGroup(topic);
|
||||||
log.info("[{}] Removed outdated topic {} for edge {} older than {}",
|
log.info("[{}] Removed outdated topic {} for edge {} older than {}",
|
||||||
tenantId, topic, edgeId, Date.from(Instant.ofEpochMilli(currentTimeMillis - ttlMillis)));
|
tenantId, topic, edgeId, Date.from(Instant.ofEpochMilli(currentTimeMillis - ttlMillis)));
|
||||||
}
|
}
|
||||||
@ -121,7 +121,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
Edge edge = edgeService.findEdgeById(tenantId, edgeId);
|
Edge edge = edgeService.findEdgeById(tenantId, edgeId);
|
||||||
if (edge == null) {
|
if (edge == null) {
|
||||||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
||||||
kafkaAdmin.deleteTopic(topic);
|
deleteTopicAndConsumerGroup(topic);
|
||||||
log.info("[{}] Removed topic {} for deleted edge {}", tenantId, topic, edgeId);
|
log.info("[{}] Removed topic {} for deleted edge {}", tenantId, topic, edgeId);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -132,12 +132,17 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
} else {
|
} else {
|
||||||
for (EdgeId edgeId : edgeIds) {
|
for (EdgeId edgeId : edgeIds) {
|
||||||
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
||||||
kafkaAdmin.deleteTopic(topic);
|
deleteTopicAndConsumerGroup(topic);
|
||||||
}
|
}
|
||||||
log.info("[{}] Removed topics for not existing tenant and edges {}", tenantId, edgeIds);
|
log.info("[{}] Removed topics for not existing tenant and edges {}", tenantId, edgeIds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void deleteTopicAndConsumerGroup(String topic) {
|
||||||
|
kafkaAdmin.deleteTopic(topic);
|
||||||
|
kafkaAdmin.deleteConsumerGroup(topic);
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isTopicExpired(long lastConnectTime, long ttlMillis, long currentTimeMillis) {
|
private boolean isTopicExpired(long lastConnectTime, long ttlMillis, long currentTimeMillis) {
|
||||||
return lastConnectTime + ttlMillis < currentTimeMillis;
|
return lastConnectTime + ttlMillis < currentTimeMillis;
|
||||||
}
|
}
|
||||||
@ -146,7 +151,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
|
|||||||
Map<TenantId, List<EdgeId>> tenantEdgeMap = new HashMap<>();
|
Map<TenantId, List<EdgeId>> tenantEdgeMap = new HashMap<>();
|
||||||
for (String topic : topics) {
|
for (String topic : topics) {
|
||||||
try {
|
try {
|
||||||
String remaining = topic.substring(prefix.length());
|
String remaining = topic.substring(prefix.length() + 1);
|
||||||
String[] parts = remaining.split("\\.");
|
String[] parts = remaining.split("\\.");
|
||||||
TenantId tenantId = TenantId.fromUUID(UUID.fromString(parts[0]));
|
TenantId tenantId = TenantId.fromUUID(UUID.fromString(parts[0]));
|
||||||
EdgeId edgeId = new EdgeId(UUID.fromString(parts[1]));
|
EdgeId edgeId = new EdgeId(UUID.fromString(parts[1]));
|
||||||
|
|||||||
@ -0,0 +1,22 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2025 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.queue;
|
||||||
|
|
||||||
|
public interface TbEdgeQueueAdmin extends TbQueueAdmin {
|
||||||
|
void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId);
|
||||||
|
|
||||||
|
void deleteConsumerGroup(String consumerGroupId);
|
||||||
|
}
|
||||||
@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
|
|||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.errors.TopicExistsException;
|
import org.apache.kafka.common.errors.TopicExistsException;
|
||||||
|
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
|
||||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||||
import org.thingsboard.server.queue.util.PropertyUtils;
|
import org.thingsboard.server.queue.util.PropertyUtils;
|
||||||
|
|
||||||
@ -43,7 +44,7 @@ import java.util.stream.Collectors;
|
|||||||
* Created by ashvayka on 24.09.18.
|
* Created by ashvayka on 24.09.18.
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class TbKafkaAdmin implements TbQueueAdmin {
|
public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
|
||||||
|
|
||||||
private final TbKafkaSettings settings;
|
private final TbKafkaSettings settings;
|
||||||
private final Map<String, String> topicConfigs;
|
private final Map<String, String> topicConfigs;
|
||||||
@ -149,17 +150,38 @@ public class TbKafkaAdmin implements TbQueueAdmin {
|
|||||||
* */
|
* */
|
||||||
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
|
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
|
||||||
try {
|
try {
|
||||||
syncOffsetsUnsafe(fatGroupId, newGroupId, partitionId);
|
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
|
||||||
|
if (partitionId == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
|
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, Integer partitionId) throws ExecutionException, InterruptedException, TimeoutException {
|
/**
|
||||||
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
|
* Sync edge notifications offsets from a fat group to a single group per edge
|
||||||
if (partitionId == null) {
|
* */
|
||||||
return;
|
public void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId) {
|
||||||
|
try {
|
||||||
|
log.info("syncEdgeNotificationsOffsets [{}][{}]", fatGroupId, newGroupId);
|
||||||
|
syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to syncEdgeNotificationsOffsets from {} to {}", fatGroupId, newGroupId, e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteConsumerGroup(String consumerGroupId) {
|
||||||
|
try {
|
||||||
|
settings.getAdminClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId));
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to delete consumer group {}", consumerGroupId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
|
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
|
||||||
if (oldOffsets.isEmpty()) {
|
if (oldOffsets.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
@ -167,7 +189,7 @@ public class TbKafkaAdmin implements TbQueueAdmin {
|
|||||||
|
|
||||||
for (var consumerOffset : oldOffsets.entrySet()) {
|
for (var consumerOffset : oldOffsets.entrySet()) {
|
||||||
var tp = consumerOffset.getKey();
|
var tp = consumerOffset.getKey();
|
||||||
if (!tp.topic().endsWith("." + partitionId)) {
|
if (!tp.topic().endsWith(topicSuffix)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
var om = consumerOffset.getValue();
|
var om = consumerOffset.getValue();
|
||||||
|
|||||||
@ -46,6 +46,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
|
||||||
|
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
|
||||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||||
import org.thingsboard.server.queue.TbQueueProducer;
|
import org.thingsboard.server.queue.TbQueueProducer;
|
||||||
@ -103,7 +104,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
private final TbQueueAdmin vcAdmin;
|
private final TbQueueAdmin vcAdmin;
|
||||||
private final TbQueueAdmin housekeeperAdmin;
|
private final TbQueueAdmin housekeeperAdmin;
|
||||||
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
||||||
private final TbQueueAdmin edgeAdmin;
|
private final TbEdgeQueueAdmin edgeAdmin;
|
||||||
private final TbQueueAdmin edgeEventAdmin;
|
private final TbQueueAdmin edgeEventAdmin;
|
||||||
private final TbQueueAdmin cfAdmin;
|
private final TbQueueAdmin cfAdmin;
|
||||||
private final TbQueueAdmin cfStateAdmin;
|
private final TbQueueAdmin cfStateAdmin;
|
||||||
@ -495,9 +496,13 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic());
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
||||||
|
|
||||||
|
edgeAdmin.syncEdgeNotificationsOffsets(topicService.buildTopicName("monolith-edge-event-consumer"), topic);
|
||||||
|
|
||||||
|
consumerBuilder.topic(topic);
|
||||||
consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
|
consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
|
||||||
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
|
consumerBuilder.groupId(topic);
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(edgeEventAdmin);
|
consumerBuilder.admin(edgeEventAdmin);
|
||||||
consumerBuilder.statsService(consumerStatsService);
|
consumerBuilder.statsService(consumerStatsService);
|
||||||
|
|||||||
@ -42,6 +42,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
|
||||||
|
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
|
||||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||||
import org.thingsboard.server.queue.TbQueueProducer;
|
import org.thingsboard.server.queue.TbQueueProducer;
|
||||||
@ -99,7 +100,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
private final TbQueueAdmin vcAdmin;
|
private final TbQueueAdmin vcAdmin;
|
||||||
private final TbQueueAdmin housekeeperAdmin;
|
private final TbQueueAdmin housekeeperAdmin;
|
||||||
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
||||||
private final TbQueueAdmin edgeAdmin;
|
private final TbEdgeQueueAdmin edgeAdmin;
|
||||||
private final TbQueueAdmin edgeEventAdmin;
|
private final TbQueueAdmin edgeEventAdmin;
|
||||||
private final TbQueueAdmin cfAdmin;
|
private final TbQueueAdmin cfAdmin;
|
||||||
private final TbQueueAdmin edqsEventsAdmin;
|
private final TbQueueAdmin edqsEventsAdmin;
|
||||||
@ -439,9 +440,13 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic());
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
|
||||||
|
|
||||||
|
edgeAdmin.syncEdgeNotificationsOffsets(topicService.buildTopicName("tb-core-edge-event-consumer"), topic);
|
||||||
|
|
||||||
|
consumerBuilder.topic(topic);
|
||||||
consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
|
consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
|
||||||
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
|
consumerBuilder.groupId(topic);
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(edgeEventAdmin);
|
consumerBuilder.admin(edgeEventAdmin);
|
||||||
consumerBuilder.statsService(consumerStatsService);
|
consumerBuilder.statsService(consumerStatsService);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user