From 547713f30a9b4ab4a9c806fb5922a4de65122ad6 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 2 May 2022 23:44:43 +0200 Subject: [PATCH] update device profiles queue by tenant profile --- .../entity/queue/DefaultTbQueueService.java | 85 +++++++++++++++---- .../queue/DefaultTbClusterService.java | 44 +++++----- .../queue/DefaultTbCoreConsumerService.java | 6 +- .../DefaultTbRuleEngineConsumerService.java | 4 +- .../processing/AbstractConsumerService.java | 7 +- .../queue/discovery/HashPartitionService.java | 5 ++ .../queue/discovery/PartitionService.java | 2 + .../queue/discovery/QueueRoutingInfo.java | 2 +- .../service/DefaultTransportService.java | 1 + 9 files changed, 111 insertions(+), 45 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entity/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entity/queue/DefaultTbQueueService.java index 37556d8b95..e58c4450fb 100644 --- a/application/src/main/java/org/thingsboard/server/service/entity/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entity/queue/DefaultTbQueueService.java @@ -19,15 +19,19 @@ import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.TbQueueClusterService; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.ArrayList; @@ -41,9 +45,12 @@ import java.util.stream.Collectors; @TbCoreComponent @AllArgsConstructor public class DefaultTbQueueService implements TbQueueService { + private static final String MAIN = "Main"; + private final QueueService queueService; - private final TbQueueClusterService queueClusterService; + private final TbClusterService tbClusterService; private final TbQueueAdmin tbQueueAdmin; + private final DeviceProfileService deviceProfileService; @Override public Queue saveQueue(Queue queue) { @@ -90,8 +97,8 @@ public class DefaultTbQueueService implements TbQueueService { } } - if (queueClusterService != null) { - queueClusterService.onQueueChange(queue); + if (tbClusterService != null) { + tbClusterService.onQueueChange(queue); } } @@ -106,13 +113,13 @@ public class DefaultTbQueueService implements TbQueueService { tbQueueAdmin.createTopicIfNotExists( new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); } - if (queueClusterService != null) { - queueClusterService.onQueueChange(queue); + if (tbClusterService != null) { + tbClusterService.onQueueChange(queue); } } else { log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); - if (queueClusterService != null) { - queueClusterService.onQueueChange(queue); + if (tbClusterService != null) { + tbClusterService.onQueueChange(queue); } await(); for (int i = currentPartitions; i < oldPartitions; i++) { @@ -120,14 +127,14 @@ public class DefaultTbQueueService implements TbQueueService { new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); } } - } else if (!oldQueue.equals(queue) && queueClusterService != null) { - queueClusterService.onQueueChange(queue); + } else if (!oldQueue.equals(queue) && tbClusterService != null) { + tbClusterService.onQueueChange(queue); } } private void onQueueDeleted(TenantId tenantId, Queue queue) { - if (queueClusterService != null) { - queueClusterService.onQueueDelete(queue); + if (tbClusterService != null) { + tbClusterService.onQueueDelete(queue); await(); } // queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId); @@ -151,7 +158,6 @@ public class DefaultTbQueueService implements TbQueueService { @Override public void updateQueuesByTenants(List tenantIds, TenantProfile newTenantProfile, TenantProfile oldTenantProfile) { - boolean oldIsolated = oldTenantProfile != null && oldTenantProfile.isIsolatedTbRuleEngine(); boolean newIsolated = newTenantProfile.isIsolatedTbRuleEngine(); @@ -199,9 +205,35 @@ public class DefaultTbQueueService implements TbQueueService { } tenantIds.forEach(tenantId -> { - toRemove.forEach(q -> deleteQueueByQueueName(tenantId, q)); + Map> deviceProfileQueues; - toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))); + if (oldTenantProfile != null && !newTenantProfile.getId().equals(oldTenantProfile.getId()) || !toRemove.isEmpty()) { + List deviceProfiles = deviceProfileService.findDeviceProfiles(tenantId, new PageLink(Integer.MAX_VALUE)).getData(); + deviceProfileQueues = deviceProfiles.stream() + .filter(dp -> dp.getDefaultQueueId() != null) + .collect(Collectors.groupingBy(DeviceProfile::getDefaultQueueId)); + } else { + deviceProfileQueues = Collections.emptyMap(); + } + + Map createdQueues = toCreate.stream() + .map(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))) + .collect(Collectors.toMap(Queue::getName, Queue::getId)); + + // assigning created queues to device profiles instead of system queues + if (oldTenantProfile != null && !oldTenantProfile.isIsolatedTbRuleEngine()) { + deviceProfileQueues.forEach((queueId, list) -> { + Queue queue = queueService.findQueueById(TenantId.SYS_TENANT_ID, queueId); + QueueId queueIdToAssign = createdQueues.get(queue.getName()); + if (queueIdToAssign == null) { + queueIdToAssign = createdQueues.get(MAIN); + } + for (DeviceProfile deviceProfile : list) { + deviceProfile.setDefaultQueueId(queueIdToAssign); + saveDeviceProfile(deviceProfile); + } + }); + } toUpdate.forEach(key -> { Queue queueToUpdate = new Queue(tenantId, newQueues.get(key)); @@ -215,7 +247,30 @@ public class DefaultTbQueueService implements TbQueueService { saveQueue(queueToUpdate); } }); + + toRemove.forEach(q -> { + Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q); + QueueId queueIdForRemove = queue.getId(); + if (deviceProfileQueues.containsKey(queueIdForRemove)) { + Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, q); + if (foundQueue == null) { + foundQueue = queueService.findQueueByTenantIdAndName(tenantId, MAIN); + } + QueueId newQueueId = foundQueue.getId(); + deviceProfileQueues.get(queueIdForRemove).stream() + .peek(dp -> dp.setDefaultQueueId(newQueueId)) + .forEach(this::saveDeviceProfile); + } + deleteQueue(tenantId, queueIdForRemove); + }); }); } + //TODO: remove after implementing TbDeviceProfileService + private void saveDeviceProfile(DeviceProfile deviceProfile) { + DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); + tbClusterService.onDeviceProfileChange(savedDeviceProfile, null); + tbClusterService.broadcastEntityStateChangeEvent(deviceProfile.getTenantId(), savedDeviceProfile.getId(), ComponentLifecycleEvent.UPDATED); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index f1ce95f166..2806c35cda 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -86,6 +86,8 @@ public class DefaultTbClusterService implements TbClusterService { private boolean statsEnabled; @Value("${edges.enabled}") protected boolean edgesEnabled; + @Value("${service.type:monolith}") + private String serviceType; private final AtomicInteger toCoreMsgs = new AtomicInteger(0); private final AtomicInteger toCoreNfs = new AtomicInteger(0); @@ -503,10 +505,10 @@ public class DefaultTbClusterService implements TbClusterService { .setPartitions(queue.getPartitions()) .build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); - ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); - doSendQueueNotifications(transportMsg, coreMsg, ruleEngineMsg); + ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); + doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg); } @Override @@ -521,32 +523,32 @@ public class DefaultTbClusterService implements TbClusterService { .setQueueName(queue.getName()) .build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); - ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); - doSendQueueNotifications(transportMsg, coreMsg, ruleEngineMsg); + ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); + doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg); } - private void doSendQueueNotifications(ToTransportMsg transportMsg, ToCoreNotificationMsg coreMsg, ToRuleEngineNotificationMsg ruleEngineMsg) { - Set tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT); - for (TransportProtos.ServiceInfo transportService : tbTransportServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportService.getServiceId()); - producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); - toTransportNfs.incrementAndGet(); - } - - Set tbCoreServices = partitionService.getAllServices(ServiceType.TB_CORE); - for (TransportProtos.ServiceInfo coreService : tbCoreServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreService.getServiceId()); - producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null); - toCoreNfs.incrementAndGet(); - } - + private void doSendQueueNotifications(ToRuleEngineNotificationMsg ruleEngineMsg, ToCoreNotificationMsg coreMsg, ToTransportMsg transportMsg) { Set tbRuleEngineServices = partitionService.getAllServices(ServiceType.TB_RULE_ENGINE); for (TransportProtos.ServiceInfo ruleEngineService : tbRuleEngineServices) { TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineService.getServiceId()); producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null); toRuleEngineNfs.incrementAndGet(); } + if (!serviceType.equals("monolith")) { + Set tbCoreServices = partitionService.getAllServices(ServiceType.TB_CORE); + for (TransportProtos.ServiceInfo coreService : tbCoreServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreService.getServiceId()); + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null); + toCoreNfs.incrementAndGet(); + } + Set tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT); + for (TransportProtos.ServiceInfo transportService : tbTransportServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportService.getServiceId()); + producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); + toTransportNfs.incrementAndGet(); + } + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 7113272187..bec0dceed3 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -117,7 +117,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> usageStatsConsumer; private final TbQueueConsumer> firmwareStatesConsumer; @@ -139,7 +138,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> nfConsumer; public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, - TbApiUsageStateService apiUsageStateService, TbQueueConsumer> nfConsumer) { + TbApiUsageStateService apiUsageStateService, PartitionService partitionService, + TbQueueConsumer> nfConsumer) { this.actorContext = actorContext; this.encodingService = encodingService; this.tenantProfileCache = tenantProfileCache; this.deviceProfileCache = deviceProfileCache; this.apiUsageStateService = apiUsageStateService; + this.partitionService = partitionService; this.nfConsumer = nfConsumer; } @@ -166,6 +170,7 @@ public abstract class AbstractConsumerService profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); if (profileOpt.isPresent()) { Tenant tenant = profileOpt.get(); + partitionService.removeTenant(tenant.getId()); boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); if (updated) { rateLimitService.update(tenant.getId());