From 86f21023febedb362e8f991a97886d5cee295dec Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 27 Apr 2020 14:44:29 +0300 Subject: [PATCH] Improved PartitionService and fixed startup order in docker-compose.yml --- .../ConsistentHashParitionServiceTest.java | 21 +++++++++++++------ .../ConsistentHashPartitionService.java | 12 ++++++++++- docker/docker-compose.yml | 4 ++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java index 3f7619ed1e..6b03482c0e 100644 --- a/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.routing; import com.datastax.driver.core.utils.UUIDs; import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,6 +32,8 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +44,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @Slf4j @RunWith(MockitoJUnitRunner.class) @@ -52,6 +56,7 @@ public class ConsistentHashParitionServiceTest { private TbServiceInfoProvider discoveryService; private TenantRoutingInfoService routingInfoService; private ApplicationEventPublisher applicationEventPublisher; + private TbQueueRuleEngineSettings ruleEngineSettings; private String hashFunctionName = "murmur3_128"; private Integer virtualNodesSize = 16; @@ -62,12 +67,15 @@ public class ConsistentHashParitionServiceTest { discoveryService = mock(TbServiceInfoProvider.class); applicationEventPublisher = mock(ApplicationEventPublisher.class); routingInfoService = mock(TenantRoutingInfoService.class); - clusterRoutingService = new ConsistentHashPartitionService(discoveryService, routingInfoService, applicationEventPublisher); + ruleEngineSettings = mock(TbQueueRuleEngineSettings.class); + clusterRoutingService = new ConsistentHashPartitionService(discoveryService, + routingInfoService, + applicationEventPublisher, + ruleEngineSettings + ); + when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList()); ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core"); ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3); - ReflectionTestUtils.setField(clusterRoutingService, "ruleEngineTopic", "tb.rule-engine"); - ReflectionTestUtils.setField(clusterRoutingService, "ruleEnginePartitions", 100); - ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize); TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder() @@ -107,8 +115,9 @@ public class ConsistentHashParitionServiceTest { List> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); long end = System.currentTimeMillis(); double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue()); - System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", (diff / ITERATIONS) * 100.0) + "%)"); - + double diffPercent = (diff / ITERATIONS) * 100.0; + System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)"); + Assert.assertTrue(diffPercent < 0.5); for (Map.Entry entry : data) { System.out.println(entry.getKey() + ": " + entry.getValue()); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java index df25ea3ba9..918570bc56 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java @@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; @@ -61,6 +62,7 @@ public class ConsistentHashPartitionService implements PartitionService { private final ApplicationEventPublisher applicationEventPublisher; private final TbServiceInfoProvider serviceInfoProvider; private final TenantRoutingInfoService tenantRoutingInfoService; + private final TbQueueRuleEngineSettings tbQueueRuleEngineSettings; private final ConcurrentMap partitionTopics = new ConcurrentHashMap<>(); private final ConcurrentMap partitionSizes = new ConcurrentHashMap<>(); private final ConcurrentMap tenantRoutingInfoMap = new ConcurrentHashMap<>(); @@ -74,10 +76,14 @@ public class ConsistentHashPartitionService implements PartitionService { private HashFunction hashFunction; - public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher) { + public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, + TenantRoutingInfoService tenantRoutingInfoService, + ApplicationEventPublisher applicationEventPublisher, + TbQueueRuleEngineSettings tbQueueRuleEngineSettings) { this.serviceInfoProvider = serviceInfoProvider; this.tenantRoutingInfoService = tenantRoutingInfoService; this.applicationEventPublisher = applicationEventPublisher; + this.tbQueueRuleEngineSettings = tbQueueRuleEngineSettings; } @PostConstruct @@ -85,6 +91,10 @@ public class ConsistentHashPartitionService implements PartitionService { this.hashFunction = forName(hashFunctionName); partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions); partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic); + tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> { + partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic()); + partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getPartitions()); + }); } @Override diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 783a5ade46..2f0d0fb3b3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -70,6 +70,8 @@ services: - kafka - redis - tb-js-executor + - tb-rule-engine1 + - tb-rule-engine2 tb-core2: restart: always image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" @@ -92,6 +94,8 @@ services: - kafka - redis - tb-js-executor + - tb-rule-engine1 + - tb-rule-engine2 tb-rule-engine1: restart: always image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"