From 0d8a2549d959b59fb4053195d1e57eb0cbd8f848 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 14 Sep 2023 15:22:53 +0300 Subject: [PATCH] Single PartitionChangeEvent after recalculatePartitions --- .../actors/service/DefaultActorService.java | 2 +- .../DefaultTbRuleEngineConsumerService.java | 28 ++++++++++--------- .../discovery/HashPartitionServiceTest.java | 22 ++++++--------- .../state/DefaultDeviceStateServiceTest.java | 4 ++- .../common/msg/queue/PartitionChangeMsg.java | 4 --- .../queue/discovery/HashPartitionService.java | 16 +++++++++-- .../discovery/event/PartitionChangeEvent.java | 18 +++++++----- 7 files changed, 52 insertions(+), 42 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index dd5bd4f1ed..380da71443 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -121,7 +121,7 @@ public class DefaultActorService extends TbApplicationEventListener { + String serviceQueue = queueKey.getQueueName(); + log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions); + Queue configuration = consumerConfigurations.get(queueKey); + if (configuration == null) { + log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey); + return; + } + if (!configuration.isConsumerPerPartition()) { + consumers.get(queueKey).subscribe(partitions); + } else { + log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, partitions); + subscribeConsumerPerPartition(queueKey, partitions); + } + }); } } diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 5e800d52a5..49008ab559 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -286,10 +286,8 @@ public class HashPartitionServiceTest { HashPartitionService partitionService_common = createPartitionService(); partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); verifyPartitionChangeEvent(event -> { - return event.getQueueKey().getTenantId().isSysTenantId() && - event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && - event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet()) - .size() == systemQueue.getPartitions(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TenantId.SYS_TENANT_ID); + return event.getPartitionsMap().get(queueKey).size() == systemQueue.getPartitions(); }); Mockito.reset(applicationEventPublisher); @@ -316,18 +314,15 @@ public class HashPartitionServiceTest { partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); // expecting event about no partitions for isolated queue key verifyPartitionChangeEvent(event -> { - return event.getQueueKey().getTenantId().equals(tenantId) && - event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && - event.getPartitions().isEmpty(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); + return event.getPartitionsMap().get(queueKey).isEmpty(); }); partitionService_dedicated.updateQueue(queueUpdateMsg); partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); verifyPartitionChangeEvent(event -> { - return event.getQueueKey().getTenantId().equals(tenantId) && - event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && - event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet()) - .size() == isolatedQueue.getPartitions(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); + return event.getPartitionsMap().get(queueKey).size() == isolatedQueue.getPartitions(); }); @@ -345,9 +340,8 @@ public class HashPartitionServiceTest { partitionService_dedicated.removeQueue(queueDeleteMsg); partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); verifyPartitionChangeEvent(event -> { - return event.getQueueKey().getTenantId().equals(tenantId) && - event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && - event.getPartitions().isEmpty(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); + return event.getPartitionsMap().get(queueKey).isEmpty(); }); } diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index ccb0b5e26c..adb46affe8 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -160,7 +160,9 @@ public class DefaultDeviceStateServiceTest { Mockito.reset(service, telemetrySubscriptionService); ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout); service.init(); - PartitionChangeEvent event = new PartitionChangeEvent(this, new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)); + PartitionChangeEvent event = new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of( + new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi) + )); service.onApplicationEvent(event); Thread.sleep(100); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/PartitionChangeMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/PartitionChangeMsg.java index 39d7dad2db..fd48b36e67 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/PartitionChangeMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/PartitionChangeMsg.java @@ -20,8 +20,6 @@ import lombok.Getter; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; -import java.util.Set; - /** * @author Andrew Shvayka */ @@ -30,8 +28,6 @@ public final class PartitionChangeMsg implements TbActorMsg { @Getter private final ServiceType serviceType; - @Getter - private final Set partitions; @Override public MsgType getMsgType() { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 33fb350292..1974c82ec4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -271,6 +271,8 @@ public class HashPartitionService implements PartitionService { final ConcurrentMap> oldPartitions = myPartitions; myPartitions = newPartitions; + Map> changedPartitionsMap = new HashMap<>(); + Set removed = new HashSet<>(); oldPartitions.forEach((queueKey, partitions) -> { if (!newPartitions.containsKey(queueKey)) { @@ -286,7 +288,7 @@ public class HashPartitionService implements PartitionService { } removed.forEach(queueKey -> { log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey); - applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet())); + changedPartitionsMap.put(queueKey, Collections.emptySet()); }); myPartitions.forEach((queueKey, partitions) -> { @@ -295,9 +297,19 @@ public class HashPartitionService implements PartitionService { Set tpiList = partitions.stream() .map(partition -> buildTopicPartitionInfo(queueKey, partition)) .collect(Collectors.toSet()); - applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, tpiList)); + changedPartitionsMap.put(queueKey, tpiList); } }); + if (!changedPartitionsMap.isEmpty()) { + Map>> partitionsByServiceType = new HashMap<>(); + changedPartitionsMap.forEach((queueKey, partitions) -> { + partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>()) + .put(queueKey, partitions); + }); + partitionsByServiceType.forEach((serviceType, partitionsMap) -> { + applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); + }); + } if (currentOtherServices == null) { currentOtherServices = new ArrayList<>(otherServices); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java index c7a00daa1c..0d74cdfaee 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java @@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.QueueKey; +import java.util.Collections; +import java.util.Map; import java.util.Set; @ToString(callSuper = true) @@ -29,17 +31,19 @@ public class PartitionChangeEvent extends TbApplicationEvent { private static final long serialVersionUID = -8731788167026510559L; @Getter - private final QueueKey queueKey; + private final ServiceType serviceType; @Getter - private final Set partitions; + private final Map> partitionsMap; - public PartitionChangeEvent(Object source, QueueKey queueKey, Set partitions) { + public PartitionChangeEvent(Object source, ServiceType serviceType, Map> partitionsMap) { super(source); - this.queueKey = queueKey; - this.partitions = partitions; + this.serviceType = serviceType; + this.partitionsMap = partitionsMap; } - public ServiceType getServiceType() { - return queueKey.getType(); + // only for service types that have single QueueKey + public Set getPartitions() { + return partitionsMap.values().stream().findAny().orElse(Collections.emptySet()); } + }