Improved PartitionService and fixed startup order in docker-compose.yml

This commit is contained in:
Andrii Shvaika 2020-04-27 14:44:29 +03:00
parent a779839081
commit 86f21023fe
3 changed files with 30 additions and 7 deletions

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.routing;
import com.datastax.driver.core.utils.UUIDs; import com.datastax.driver.core.utils.UUIDs;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -31,6 +32,8 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -41,6 +44,7 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Slf4j @Slf4j
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -52,6 +56,7 @@ public class ConsistentHashParitionServiceTest {
private TbServiceInfoProvider discoveryService; private TbServiceInfoProvider discoveryService;
private TenantRoutingInfoService routingInfoService; private TenantRoutingInfoService routingInfoService;
private ApplicationEventPublisher applicationEventPublisher; private ApplicationEventPublisher applicationEventPublisher;
private TbQueueRuleEngineSettings ruleEngineSettings;
private String hashFunctionName = "murmur3_128"; private String hashFunctionName = "murmur3_128";
private Integer virtualNodesSize = 16; private Integer virtualNodesSize = 16;
@ -62,12 +67,15 @@ public class ConsistentHashParitionServiceTest {
discoveryService = mock(TbServiceInfoProvider.class); discoveryService = mock(TbServiceInfoProvider.class);
applicationEventPublisher = mock(ApplicationEventPublisher.class); applicationEventPublisher = mock(ApplicationEventPublisher.class);
routingInfoService = mock(TenantRoutingInfoService.class); routingInfoService = mock(TenantRoutingInfoService.class);
clusterRoutingService = new ConsistentHashPartitionService(discoveryService, routingInfoService, applicationEventPublisher); ruleEngineSettings = mock(TbQueueRuleEngineSettings.class);
clusterRoutingService = new ConsistentHashPartitionService(discoveryService,
routingInfoService,
applicationEventPublisher,
ruleEngineSettings
);
when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList());
ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core"); ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3); ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
ReflectionTestUtils.setField(clusterRoutingService, "ruleEngineTopic", "tb.rule-engine");
ReflectionTestUtils.setField(clusterRoutingService, "ruleEnginePartitions", 100);
ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize); ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder() TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
@ -107,8 +115,9 @@ public class ConsistentHashParitionServiceTest {
List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue()); double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", (diff / ITERATIONS) * 100.0) + "%)"); double diffPercent = (diff / ITERATIONS) * 100.0;
System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
Assert.assertTrue(diffPercent < 0.5);
for (Map.Entry<Integer, Integer> entry : data) { for (Map.Entry<Integer, Integer> entry : data) {
System.out.println(entry.getKey() + ": " + entry.getValue()); System.out.println(entry.getKey() + ": " + entry.getValue());
} }

View File

@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -61,6 +62,7 @@ public class ConsistentHashPartitionService implements PartitionService {
private final ApplicationEventPublisher applicationEventPublisher; private final ApplicationEventPublisher applicationEventPublisher;
private final TbServiceInfoProvider serviceInfoProvider; private final TbServiceInfoProvider serviceInfoProvider;
private final TenantRoutingInfoService tenantRoutingInfoService; private final TenantRoutingInfoService tenantRoutingInfoService;
private final TbQueueRuleEngineSettings tbQueueRuleEngineSettings;
private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>(); private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>(); private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>(); private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>();
@ -74,10 +76,14 @@ public class ConsistentHashPartitionService implements PartitionService {
private HashFunction hashFunction; private HashFunction hashFunction;
public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher) { public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider,
TenantRoutingInfoService tenantRoutingInfoService,
ApplicationEventPublisher applicationEventPublisher,
TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
this.serviceInfoProvider = serviceInfoProvider; this.serviceInfoProvider = serviceInfoProvider;
this.tenantRoutingInfoService = tenantRoutingInfoService; this.tenantRoutingInfoService = tenantRoutingInfoService;
this.applicationEventPublisher = applicationEventPublisher; this.applicationEventPublisher = applicationEventPublisher;
this.tbQueueRuleEngineSettings = tbQueueRuleEngineSettings;
} }
@PostConstruct @PostConstruct
@ -85,6 +91,10 @@ public class ConsistentHashPartitionService implements PartitionService {
this.hashFunction = forName(hashFunctionName); this.hashFunction = forName(hashFunctionName);
partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions); partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic); partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getPartitions());
});
} }
@Override @Override

View File

@ -70,6 +70,8 @@ services:
- kafka - kafka
- redis - redis
- tb-js-executor - tb-js-executor
- tb-rule-engine1
- tb-rule-engine2
tb-core2: tb-core2:
restart: always restart: always
image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
@ -92,6 +94,8 @@ services:
- kafka - kafka
- redis - redis
- tb-js-executor - tb-js-executor
- tb-rule-engine1
- tb-rule-engine2
tb-rule-engine1: tb-rule-engine1:
restart: always restart: always
image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}" image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"