diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 7983a0adcd..e2779ba28b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -88,10 +88,8 @@ public class DefaultTbClusterService implements TbClusterService { @Value("${cluster.stats.enabled:false}") private boolean statsEnabled; - @Value("${edges.enabled}") + @Value("${edges.enabled:true}") protected boolean edgesEnabled; - @Value("${service.type:monolith}") - private String serviceType; private final AtomicInteger toCoreMsgs = new AtomicInteger(0); private final AtomicInteger toCoreNfs = new AtomicInteger(0); @@ -584,25 +582,27 @@ public class DefaultTbClusterService implements TbClusterService { } private void doSendQueueNotifications(ToRuleEngineNotificationMsg ruleEngineMsg, ToCoreNotificationMsg coreMsg, ToTransportMsg transportMsg) { - Set tbRuleEngineServices = partitionService.getAllServices(ServiceType.TB_RULE_ENGINE); - for (TransportProtos.ServiceInfo ruleEngineService : tbRuleEngineServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineService.getServiceId()); + Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); + Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); + Set tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); + // No need to push notifications twice + tbTransportServices.removeAll(tbCoreServices); + tbCoreServices.removeAll(tbRuleEngineServices); + + for (String ruleEngineServiceId : tbRuleEngineServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId); producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null); toRuleEngineNfs.incrementAndGet(); } - if (!serviceType.equals("monolith")) { - Set tbCoreServices = partitionService.getAllServices(ServiceType.TB_CORE); - for (TransportProtos.ServiceInfo coreService : tbCoreServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreService.getServiceId()); - producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null); - toCoreNfs.incrementAndGet(); - } - Set tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT); - for (TransportProtos.ServiceInfo transportService : tbTransportServices) { - TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportService.getServiceId()); - producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); - toTransportNfs.incrementAndGet(); - } + for (String coreServiceId : tbCoreServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreServiceId); + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null); + toCoreNfs.incrementAndGet(); + } + for (String transportServiceId : tbTransportServices) { + TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); + producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); + toTransportNfs.incrementAndGet(); } } } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java new file mode 100644 index 0000000000..27eeda0b5b --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -0,0 +1,247 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.id.QueueId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.queue.Queue; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.NotificationsTopicService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import org.thingsboard.server.queue.util.DataDecodingEncodingService; +import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; +import org.thingsboard.server.service.profile.TbAssetProfileCache; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; + +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Slf4j +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = DefaultTbClusterService.class) +public class DefaultTbClusterServiceTest { + + public static final String MONOLITH = "monolith"; + + public static final String CORE = "core"; + + public static final String RULE_ENGINE = "rule_engine"; + + public static final String TRANSPORT = "transport"; + + @MockBean + protected DataDecodingEncodingService encodingService; + @MockBean + protected TbDeviceProfileCache deviceProfileCache; + @MockBean + protected TbAssetProfileCache assetProfileCache; + @MockBean + protected GatewayNotificationsService gatewayNotificationsService; + @MockBean + protected PartitionService partitionService; + @MockBean + protected TbQueueProducerProvider producerProvider; + + @SpyBean + protected NotificationsTopicService notificationsTopicService; + @SpyBean + protected TbClusterService clusterService; + + @Test + public void testOnQueueChangeSingleMonolith() { + when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(MONOLITH)); + when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(MONOLITH)); + when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(MONOLITH)); + + TbQueueProducer> tbQueueProducer = mock(TbQueueProducer.class); + + when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer); + + clusterService.onQueueChange(createTestQueue()); + + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH); + verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any()); + verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_TRANSPORT), any()); + + verify(tbQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH)), any(TbProtoQueueMsg.class), isNull()); + + verify(producerProvider, never()).getTbCoreNotificationsMsgProducer(); + verify(producerProvider, never()).getTransportNotificationsMsgProducer(); + } + + @Test + public void testOnQueueChangeMultipleMonoliths() { + String monolith1 = MONOLITH + 1; + String monolith2 = MONOLITH + 2; + when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(monolith1, monolith2)); + when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(monolith1, monolith2)); + when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(monolith1, monolith2)); + + TbQueueProducer> tbQueueProducer = mock(TbQueueProducer.class); + + when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer); + + clusterService.onQueueChange(createTestQueue()); + + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2); + verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any()); + verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_TRANSPORT), any()); + + verify(tbQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2)), any(TbProtoQueueMsg.class), isNull()); + + verify(producerProvider, never()).getTbCoreNotificationsMsgProducer(); + verify(producerProvider, never()).getTransportNotificationsMsgProducer(); + } + + @Test + public void testOnQueueChangeSingleMonolithAndSingleRemoteTransport() { + when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(MONOLITH)); + when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(MONOLITH)); + when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(MONOLITH, TRANSPORT)); + + TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); + TbQueueProducer> tbTransportQueueProducer = mock(TbQueueProducer.class); + + when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer); + when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer); + + clusterService.onQueueChange(createTestQueue()); + + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT); + verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any()); + + verify(tbREQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH)), any(TbProtoQueueMsg.class), isNull()); + + verify(tbTransportQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT)), any(TbProtoQueueMsg.class), isNull()); + verify(tbTransportQueueProducer, never()) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, MONOLITH)), any(TbProtoQueueMsg.class), isNull()); + + verify(tbTransportQueueProducer, never()) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, MONOLITH)), any(TbProtoQueueMsg.class), isNull()); + + verify(producerProvider, never()).getTbCoreNotificationsMsgProducer(); + } + + @Test + public void testOnQueueChangeMultipleMicroservices() { + String monolith1 = MONOLITH + 1; + String monolith2 = MONOLITH + 2; + + String core1 = CORE + 1; + String core2 = CORE + 2; + + String ruleEngine1 = RULE_ENGINE + 1; + String ruleEngine2 = RULE_ENGINE + 2; + + String transport1 = TRANSPORT + 1; + String transport2 = TRANSPORT + 2; + + when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(monolith1, monolith2, ruleEngine1, ruleEngine2)); + when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(monolith1, monolith2, core1, core2)); + when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(monolith1, monolith2, transport1, transport2)); + + TbQueueProducer> tbREQueueProducer = mock(TbQueueProducer.class); + TbQueueProducer> tbCoreQueueProducer = mock(TbQueueProducer.class); + TbQueueProducer> tbTransportQueueProducer = mock(TbQueueProducer.class); + + when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer); + when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer); + when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer); + + clusterService.onQueueChange(createTestQueue()); + + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine1); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine2); + + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_CORE, core1); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_CORE, core2); + verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_CORE, monolith1); + verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_CORE, monolith2); + + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, transport1); + verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, transport2); + verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith1); + verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2); + + verify(tbREQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbREQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2)), any(TbProtoQueueMsg.class), isNull()); + verify(tbREQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbREQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine2)), any(TbProtoQueueMsg.class), isNull()); + + verify(tbCoreQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, core1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbCoreQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, core2)), any(TbProtoQueueMsg.class), isNull()); + verify(tbCoreQueueProducer, never()) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, monolith1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbCoreQueueProducer, never()) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, monolith2)), any(TbProtoQueueMsg.class), isNull()); + + verify(tbTransportQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transport1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbTransportQueueProducer, times(1)) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transport2)), any(TbProtoQueueMsg.class), isNull()); + verify(tbTransportQueueProducer, never()) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith1)), any(TbProtoQueueMsg.class), isNull()); + verify(tbTransportQueueProducer, never()) + .send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2)), any(TbProtoQueueMsg.class), isNull()); + } + + protected Queue createTestQueue() { + TenantId tenantId = TenantId.SYS_TENANT_ID; + Queue queue = new Queue(new QueueId(UUID.randomUUID())); + queue.setTenantId(tenantId); + queue.setName("Main"); + queue.setTopic("main"); + queue.setPartitions(10); + return queue; + } +} \ No newline at end of file