From 4e526aa969bd446ee2b679b2cc7e555869f37ba9 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 14 Sep 2023 16:27:23 +0300 Subject: [PATCH] Queue removal: always publish PartitionChangeEvent; remove redundant recalculatePartitions; add debug log --- .../DefaultTbRuleEngineConsumerService.java | 2 -- .../queue/discovery/HashPartitionService.java | 24 ++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index f67d80cb1b..c715b7b03c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -188,7 +188,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions); Queue configuration = consumerConfigurations.get(queueKey); if (configuration == null) { - log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey); return; } if (!configuration.isConsumerPerPartition()) { @@ -503,7 +502,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } } - partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 1974c82ec4..5be5caf6ff 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -179,10 +179,15 @@ public class HashPartitionService implements PartitionService { public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); //TODO: remove after merging tb entity services removeTenant(tenantId); + + if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { + publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet())); + } } @Override @@ -306,9 +311,7 @@ public class HashPartitionService implements PartitionService { partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>()) .put(queueKey, partitions); }); - partitionsByServiceType.forEach((serviceType, partitionsMap) -> { - applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); - }); + partitionsByServiceType.forEach(this::publishPartitionChangeEvent); } if (currentOtherServices == null) { @@ -340,6 +343,18 @@ public class HashPartitionService implements PartitionService { applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService)); } + private void publishPartitionChangeEvent(ServiceType serviceType, Map> partitionsMap) { + if (log.isDebugEnabled()) { + log.debug("Publishing partition change event for service type " + serviceType + ":" + System.lineSeparator() + + partitionsMap.entrySet().stream() + .map(entry -> entry.getKey() + " - " + entry.getValue().stream() + .map(TopicPartitionInfo::getFullTopicName).sorted() + .collect(Collectors.toList())) + .collect(Collectors.joining(System.lineSeparator()))); + } + applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); + } + @Override public Set getAllServiceIds(ServiceType serviceType) { return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet()); @@ -505,6 +520,9 @@ public class HashPartitionService implements PartitionService { } responsibleServices.put(profileId, responsible); } + if (responsible.isEmpty()) { + return null; + } servers = responsible; }