diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index b0ef8f4cde..e13342eee0 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -50,22 +50,15 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb public Queue saveQueue(Queue queue) { boolean create = queue.getId() == null; Queue oldQueue; - if (create) { oldQueue = null; } else { oldQueue = queueService.findQueueById(queue.getTenantId(), queue.getId()); } - //TODO: add checkNotNull Queue savedQueue = queueService.saveQueue(queue); - - if (create) { - onQueueCreated(savedQueue); - } else { - onQueueUpdated(savedQueue, oldQueue); - } - + createTopicsIfNeeded(savedQueue, oldQueue); + tbClusterService.onQueuesUpdate(List.of(savedQueue)); return savedQueue; } @@ -73,54 +66,14 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb public void deleteQueue(TenantId tenantId, QueueId queueId) { Queue queue = queueService.findQueueById(tenantId, queueId); queueService.deleteQueue(tenantId, queueId); - onQueueDeleted(queue); + tbClusterService.onQueuesDelete(List.of(queue)); } @Override public void deleteQueueByQueueName(TenantId tenantId, String queueName) { Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, queueName); queueService.deleteQueue(tenantId, queue.getId()); - onQueueDeleted(queue); - } - - private void onQueueCreated(Queue queue) { - for (int i = 0; i < queue.getPartitions(); i++) { - tbQueueAdmin.createTopicIfNotExists( - new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); - } - - tbClusterService.onQueueChange(queue); - } - - private void onQueueUpdated(Queue queue, Queue oldQueue) { - int oldPartitions = oldQueue.getPartitions(); - int currentPartitions = queue.getPartitions(); - - if (currentPartitions != oldPartitions) { - if (currentPartitions > oldPartitions) { - log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); - for (int i = oldPartitions; i < currentPartitions; i++) { - tbQueueAdmin.createTopicIfNotExists( - new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); - } - tbClusterService.onQueueChange(queue); - } else { - log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); - tbClusterService.onQueueChange(queue); - // TODO: move all the messages left in old partitions and delete topics - } - } else if (!oldQueue.equals(queue)) { - tbClusterService.onQueueChange(queue); - } - } - - private void onQueueDeleted(Queue queue) { - tbClusterService.onQueueDelete(queue); -// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId); + tbClusterService.onQueuesDelete(List.of(queue)); } @Override @@ -176,26 +129,56 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb log.debug("[{}] Handling profile queue config update: creating queues {}, updating {}, deleting {}. Affected tenants: {}", newTenantProfile.getUuidId(), toCreate, toUpdate, toRemove, tenantIds); } - tenantIds.forEach(tenantId -> { - toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))); - toUpdate.forEach(key -> { - Queue queueToUpdate = new Queue(tenantId, newQueues.get(key)); - Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, key); - queueToUpdate.setId(foundQueue.getId()); - queueToUpdate.setCreatedTime(foundQueue.getCreatedTime()); + List updated = new ArrayList<>(); + List deleted = new ArrayList<>(); + for (TenantId tenantId : tenantIds) { + for (String name : toCreate) { + updated.add(new Queue(tenantId, newQueues.get(name))); + } - if (!queueToUpdate.equals(foundQueue)) { - saveQueue(queueToUpdate); + for (String name : toUpdate) { + Queue queue = new Queue(tenantId, newQueues.get(name)); + Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, name); + if (foundQueue != null) { + queue.setId(foundQueue.getId()); + queue.setCreatedTime(foundQueue.getCreatedTime()); } - }); + if (!queue.equals(foundQueue)) { + updated.add(queue); + createTopicsIfNeeded(queue, foundQueue); + } + } - toRemove.forEach(q -> { - Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q); - QueueId queueIdForRemove = queue.getId(); - deleteQueue(tenantId, queueIdForRemove); + for (String name : toRemove) { + Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, name); + deleted.add(queue); + } + } + + if (!updated.isEmpty()) { + updated = updated.stream() + .map(queueService::saveQueue) + .collect(Collectors.toList()); + tbClusterService.onQueuesUpdate(updated); + } + if (!deleted.isEmpty()) { + deleted.forEach(queue -> { + queueService.deleteQueue(queue.getTenantId(), queue.getId()); }); - }); + tbClusterService.onQueuesDelete(deleted); + } + } + + private void createTopicsIfNeeded(Queue queue, Queue oldQueue) { + int newPartitions = queue.getPartitions(); + int oldPartitions = oldQueue != null ? oldQueue.getPartitions() : 0; + for (int i = oldPartitions; i < newPartitions; i++) { + tbQueueAdmin.createTopicIfNotExists( + new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), + queue.getCustomProperties() + ); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 14930912d1..1f27f06502 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -59,6 +59,8 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; +import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg; +import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; @@ -68,8 +70,8 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -77,9 +79,11 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.thingsboard.server.common.util.ProtoUtils.toProto; @@ -552,40 +556,40 @@ public class DefaultTbClusterService implements TbClusterService { } @Override - public void onQueueChange(Queue queue) { - log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName()); + public void onQueuesUpdate(List queues) { + List queueUpdateMsgs = queues.stream() + .map(queue -> QueueUpdateMsg.newBuilder() + .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) + .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) + .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) + .setQueueName(queue.getName()) + .setQueueTopic(queue.getTopic()) + .setPartitions(queue.getPartitions()) + .build()) + .collect(Collectors.toList()); - TransportProtos.QueueUpdateMsg queueUpdateMsg = TransportProtos.QueueUpdateMsg.newBuilder() - .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) - .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) - .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) - .setQueueName(queue.getName()) - .setQueueTopic(queue.getTopic()) - .setPartitions(queue.getPartitions()) - .build(); - - ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); - ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); + ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build(); + ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build(); doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg); } @Override - public void onQueueDelete(Queue queue) { - log.trace("[{}][{}] Processing queue delete [{}] event", queue.getTenantId(), queue.getId(), queue.getName()); + public void onQueuesDelete(List queues) { + List queueDeleteMsgs = queues.stream() + .map(queue -> QueueDeleteMsg.newBuilder() + .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) + .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) + .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) + .setQueueName(queue.getName()) + .build()) + .collect(Collectors.toList()); - TransportProtos.QueueDeleteMsg queueDeleteMsg = TransportProtos.QueueDeleteMsg.newBuilder() - .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) - .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) - .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) - .setQueueName(queue.getName()) - .build(); - - ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); - ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); + ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build(); + ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build(); doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 8e5fde9ae8..f544c4fe25 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -391,13 +391,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService 0) { + partitionService.updateQueues(toCoreNotification.getQueueUpdateMsgsList()); callback.onSuccess(); - } else if (toCoreNotification.hasQueueDeleteMsg()) { - TransportProtos.QueueDeleteMsg queue = toCoreNotification.getQueueDeleteMsg(); - partitionService.removeQueue(queue); + } else if (toCoreNotification.getQueueDeleteMsgsCount() > 0) { + partitionService.removeQueues(toCoreNotification.getQueueDeleteMsgsList()); callback.onSuccess(); } else if (toCoreNotification.hasVcResponseMsg()) { vcQueueService.processResponse(toCoreNotification.getVcResponseMsg()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 8cc477999f..6a205ed5a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -36,6 +36,8 @@ import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg; +import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -164,11 +166,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< , proto.getResponse(), error); tbDeviceRpcService.processRpcResponseFromDevice(response); callback.onSuccess(); - } else if (nfMsg.hasQueueUpdateMsg()) { - updateQueue(nfMsg.getQueueUpdateMsg()); + } else if (nfMsg.getQueueUpdateMsgsCount() > 0) { + updateQueues(nfMsg.getQueueUpdateMsgsList()); callback.onSuccess(); - } else if (nfMsg.hasQueueDeleteMsg()) { - deleteQueue(nfMsg.getQueueDeleteMsg()); + } else if (nfMsg.getQueueDeleteMsgsCount() > 0) { + deleteQueues(nfMsg.getQueueDeleteMsgsList()); callback.onSuccess(); } else { log.trace("Received notification with missing handler"); @@ -176,39 +178,48 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } - private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { - log.info("Received queue update msg: [{}]", queueUpdateMsg); - TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); - if (partitionService.isManagedByCurrentService(tenantId)) { - QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); - String queueName = queueUpdateMsg.getQueueName(); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); - Queue queue = queueService.findQueueById(tenantId, queueId); + private void updateQueues(List queueUpdateMsgs) { + boolean partitionsChanged = false; + for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) { + log.info("Received queue update msg: [{}]", queueUpdateMsg); + TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); + if (partitionService.isManagedByCurrentService(tenantId)) { + QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); + String queueName = queueUpdateMsg.getQueueName(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); + Queue queue = queueService.findQueueById(tenantId, queueId); - TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); - Queue oldQueue = consumerManager.getQueue(); - consumerManager.update(queue); + TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); + Queue oldQueue = consumerManager.getQueue(); + consumerManager.update(queue); - if (oldQueue != null && queue.getPartitions() == oldQueue.getPartitions()) { - return; + if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) { + partitionsChanged = true; + } + } else { + partitionsChanged = true; } } - partitionService.updateQueue(queueUpdateMsg); - partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), - new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); + if (partitionsChanged) { + partitionService.updateQueues(queueUpdateMsgs); + partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), + new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); + } } - private void deleteQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { - log.info("Received queue delete msg: [{}]", queueDeleteMsg); - TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - var consumerManager = consumers.remove(queueKey); - if (consumerManager != null) { - consumerManager.delete(true); + private void deleteQueues(List queueDeleteMsgs) { + for (QueueDeleteMsg queueDeleteMsg : queueDeleteMsgs) { + log.info("Received queue delete msg: [{}]", queueDeleteMsg); + TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + var consumerManager = consumers.remove(queueKey); + if (consumerManager != null) { + consumerManager.delete(true); + } } - partitionService.removeQueue(queueDeleteMsg); + partitionService.removeQueues(queueDeleteMsgs); partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 5ff72e97e3..da3171bfc0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -184,9 +184,9 @@ public class TbCoreConsumerStats { toCoreNfEdgeSyncResponseCounter.increment(); } else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) { toCoreNfEdgeSyncResponseCounter.increment(); - } else if (msg.hasQueueUpdateMsg()) { + } else if (msg.getQueueUpdateMsgsCount() > 0) { toCoreNfQueueUpdateCounter.increment(); - } else if (msg.hasQueueDeleteMsg()) { + } else if (msg.getQueueDeleteMsgsCount() > 0) { toCoreNfQueueDeleteCounter.increment(); } else if (msg.hasVcResponseMsg()) { toCoreNfVersionControlResponseCounter.increment(); diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 4ea648119b..669b3dda4a 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -315,7 +315,7 @@ public class HashPartitionServiceTest { .setPartitions(isolatedQueue.getPartitions()) .build(); - partitionService_common.updateQueue(queueUpdateMsg); + partitionService_common.updateQueues(List.of(queueUpdateMsg)); partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); // expecting event about no partitions for isolated queue key verifyPartitionChangeEvent(event -> { @@ -323,7 +323,7 @@ public class HashPartitionServiceTest { return event.getPartitionsMap().get(queueKey).isEmpty(); }); - partitionService_dedicated.updateQueue(queueUpdateMsg); + partitionService_dedicated.updateQueues(List.of(queueUpdateMsg)); partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); verifyPartitionChangeEvent(event -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); @@ -342,7 +342,7 @@ public class HashPartitionServiceTest { .setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits()) .setQueueName(isolatedQueue.getName()) .build(); - partitionService_dedicated.removeQueue(queueDeleteMsg); + partitionService_dedicated.removeQueues(List.of(queueDeleteMsg)); verifyPartitionChangeEvent(event -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); return event.getPartitionsMap().get(queueKey).isEmpty(); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index 233839bb5f..57bf94fec3 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -40,6 +40,7 @@ import org.thingsboard.server.service.gateway_device.GatewayNotificationsService import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.List; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; @@ -92,7 +93,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH); verify(topicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any()); @@ -117,7 +118,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2); @@ -145,7 +146,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer); when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT); @@ -191,7 +192,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer); when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java index c852a24fa7..49017df678 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java @@ -17,8 +17,12 @@ package org.thingsboard.server.queue; import org.thingsboard.server.common.data.queue.Queue; -public interface TbQueueClusterService { - void onQueueChange(Queue queue); +import java.util.List; + +public interface TbQueueClusterService { + + void onQueuesUpdate(List queues); + + void onQueuesDelete(List queues); - void onQueueDelete(Queue queue); } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index fea4848654..b1362bf27c 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1280,8 +1280,8 @@ message ToCoreNotificationMsg { FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; bytes componentLifecycleMsg = 3 [deprecated = true]; bytes edgeEventUpdateMsg = 4 [deprecated = true]; - QueueUpdateMsg queueUpdateMsg = 5; - QueueDeleteMsg queueDeleteMsg = 6; + repeated QueueUpdateMsg queueUpdateMsgs = 5; + repeated QueueDeleteMsg queueDeleteMsgs = 6; VersionControlResponseMsg vcResponseMsg = 7; bytes toEdgeSyncRequestMsg = 8 [deprecated = true]; bytes fromEdgeSyncResponseMsg = 9 [deprecated = true]; @@ -1307,8 +1307,8 @@ message ToRuleEngineMsg { message ToRuleEngineNotificationMsg { bytes componentLifecycleMsg = 1 [deprecated = true]; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; - QueueUpdateMsg queueUpdateMsg = 3; - QueueDeleteMsg queueDeleteMsg = 4; + repeated QueueUpdateMsg queueUpdateMsgs = 3; + repeated QueueDeleteMsg queueDeleteMsgs = 4; ComponentLifecycleMsgProto componentLifecycle = 5; } @@ -1328,8 +1328,8 @@ message ToTransportMsg { ResourceUpdateMsg resourceUpdateMsg = 12; ResourceDeleteMsg resourceDeleteMsg = 13; UplinkNotificationMsg uplinkNotificationMsg = 14; - QueueUpdateMsg queueUpdateMsg = 15; - QueueDeleteMsg queueDeleteMsg = 16; + repeated QueueUpdateMsg queueUpdateMsgs = 15; + repeated QueueDeleteMsg queueDeleteMsgs = 16; } message UsageStatsKVProto{ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index bfc50e076b..acb26842d5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -171,27 +171,36 @@ public class HashPartitionService implements PartitionService { } @Override - public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { - TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); - partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); - partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); - myPartitions.remove(queueKey); - if (!tenantId.isSysTenantId()) { - tenantRoutingInfoMap.remove(tenantId); + public void updateQueues(List queueUpdateMsgs) { + for (TransportProtos.QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) { + TenantId tenantId = TenantId.fromUUID(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); + partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); + partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); + myPartitions.remove(queueKey); + if (!tenantId.isSysTenantId()) { + tenantRoutingInfoMap.remove(tenantId); + } } } @Override - public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { - TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - myPartitions.remove(queueKey); - partitionTopicsMap.remove(queueKey); - partitionSizesMap.remove(queueKey); - evictTenantInfo(tenantId); + public void removeQueues(List queueDeleteMsgs) { + List queueKeys = queueDeleteMsgs.stream() + .map(queueDeleteMsg -> { + TenantId tenantId = TenantId.fromUUID(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); + return new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + }) + .collect(Collectors.toList()); + queueKeys.forEach(queueKey -> { + myPartitions.remove(queueKey); + partitionTopicsMap.remove(queueKey); + partitionSizesMap.remove(queueKey); + evictTenantInfo(queueKey.getTenantId()); + }); if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { - publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet())); + publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, queueKeys.stream() + .collect(Collectors.toMap(k -> k, k -> Collections.emptySet()))); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index e9049a8085..27417e5f20 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -63,9 +63,9 @@ public interface PartitionService { int countTransportsByType(String type); - void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg); + void updateQueues(List queueUpdateMsgs); - void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + void removeQueues(List queueDeleteMsgs); void removeTenant(TenantId tenantId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 6f962ca269..2d6ab0fb8b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -50,9 +50,9 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -96,10 +96,9 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.QueueKey; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; import org.thingsboard.server.queue.scheduler.SchedulerComponent; @@ -1056,10 +1055,10 @@ public class DefaultTransportService implements TransportService { log.warn("ResourceDelete - [{}] [{}]", id, mdRez); transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg)); }); - } else if (toSessionMsg.hasQueueUpdateMsg()) { - partitionService.updateQueue(toSessionMsg.getQueueUpdateMsg()); - } else if (toSessionMsg.hasQueueDeleteMsg()) { - partitionService.removeQueue(toSessionMsg.getQueueDeleteMsg()); + } else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) { + partitionService.updateQueues(toSessionMsg.getQueueUpdateMsgsList()); + } else if (toSessionMsg.getQueueDeleteMsgsCount() > 0) { + partitionService.removeQueues(toSessionMsg.getQueueDeleteMsgsList()); } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java b/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java index c1b0c66875..1bf9e25fc0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java @@ -57,9 +57,6 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ @Autowired private DataValidator queueValidator; -// @Autowired -// private QueueStatsService queueStatsService; - @Override public Queue saveQueue(Queue queue) { log.trace("Executing createOrUpdateQueue [{}]", queue);