From 4f7c1665663cb8e4c932e50c5d7b9f3405cdda64 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 19 May 2022 00:36:26 +0200 Subject: [PATCH] using repartitionExecutor instead of synchronized --- .../service/queue/DefaultTbRuleEngineConsumerService.java | 6 +++--- .../server/queue/discovery/HashPartitionService.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 2108b32992..1844929d35 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -393,10 +393,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< tbDeviceRpcService.processRpcResponseFromDevice(response); callback.onSuccess(); } else if (nfMsg.hasQueueUpdateMsg()) { - updateQueue(nfMsg.getQueueUpdateMsg()); + repartitionExecutor.execute(() -> updateQueue(nfMsg.getQueueUpdateMsg())); callback.onSuccess(); } else if (nfMsg.hasQueueDeleteMsg()) { - deleteQueue(nfMsg.getQueueDeleteMsg()); + repartitionExecutor.execute(() -> deleteQueue(nfMsg.getQueueDeleteMsg())); callback.onSuccess(); } else { log.trace("Received notification with missing handler"); @@ -404,7 +404,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } - private synchronized void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { + private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { log.info("Received queue update msg: [{}]", queueUpdateMsg); String queueName = queueUpdateMsg.getQueueName(); TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 92278f7b4a..f9033754ff 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -173,7 +173,6 @@ public class HashPartitionService implements PartitionService { } else { QueueRoutingInfo queueRoutingInfo = queuesById.get(queueId); - //TODO: replace if we can notify CheckPoint rule nodes about queue changes if (queueRoutingInfo == null) { log.debug("Queue was removed but still used in CheckPoint rule node. [{}][{}]", tenantId, entityId); queueKey = getMainQueueKey(serviceType, tenantId); @@ -205,6 +204,8 @@ public class HashPartitionService implements PartitionService { @Override public synchronized void recalculatePartitions(ServiceInfo currentService, List otherServices) { + partitionsInit(); + tbTransportServicesByType.clear(); logServiceInfo(currentService); otherServices.forEach(this::logServiceInfo);