Merge remote-tracking branch 'origin/master' into develop/3.0
This commit is contained in:
		
						commit
						2728c7cd29
					
				@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.routing;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import com.datastax.driver.core.utils.UUIDs;
 | 
					import com.datastax.driver.core.utils.UUIDs;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
 | 
					import org.junit.Assert;
 | 
				
			||||||
import org.junit.Before;
 | 
					import org.junit.Before;
 | 
				
			||||||
import org.junit.Test;
 | 
					import org.junit.Test;
 | 
				
			||||||
import org.junit.runner.RunWith;
 | 
					import org.junit.runner.RunWith;
 | 
				
			||||||
@ -31,6 +32,8 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
				
			|||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
					import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
				
			||||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
					import org.thingsboard.server.gen.transport.TransportProtos;
 | 
				
			||||||
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
 | 
					import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
 | 
				
			||||||
 | 
					import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
				
			||||||
 | 
					import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.util.ArrayList;
 | 
					import java.util.ArrayList;
 | 
				
			||||||
import java.util.Collections;
 | 
					import java.util.Collections;
 | 
				
			||||||
@ -41,6 +44,7 @@ import java.util.Map;
 | 
				
			|||||||
import java.util.stream.Collectors;
 | 
					import java.util.stream.Collectors;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import static org.mockito.Mockito.mock;
 | 
					import static org.mockito.Mockito.mock;
 | 
				
			||||||
 | 
					import static org.mockito.Mockito.when;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
@RunWith(MockitoJUnitRunner.class)
 | 
					@RunWith(MockitoJUnitRunner.class)
 | 
				
			||||||
@ -52,6 +56,7 @@ public class ConsistentHashParitionServiceTest {
 | 
				
			|||||||
    private TbServiceInfoProvider discoveryService;
 | 
					    private TbServiceInfoProvider discoveryService;
 | 
				
			||||||
    private TenantRoutingInfoService routingInfoService;
 | 
					    private TenantRoutingInfoService routingInfoService;
 | 
				
			||||||
    private ApplicationEventPublisher applicationEventPublisher;
 | 
					    private ApplicationEventPublisher applicationEventPublisher;
 | 
				
			||||||
 | 
					    private TbQueueRuleEngineSettings ruleEngineSettings;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private String hashFunctionName = "murmur3_128";
 | 
					    private String hashFunctionName = "murmur3_128";
 | 
				
			||||||
    private Integer virtualNodesSize = 16;
 | 
					    private Integer virtualNodesSize = 16;
 | 
				
			||||||
@ -62,12 +67,15 @@ public class ConsistentHashParitionServiceTest {
 | 
				
			|||||||
        discoveryService = mock(TbServiceInfoProvider.class);
 | 
					        discoveryService = mock(TbServiceInfoProvider.class);
 | 
				
			||||||
        applicationEventPublisher = mock(ApplicationEventPublisher.class);
 | 
					        applicationEventPublisher = mock(ApplicationEventPublisher.class);
 | 
				
			||||||
        routingInfoService = mock(TenantRoutingInfoService.class);
 | 
					        routingInfoService = mock(TenantRoutingInfoService.class);
 | 
				
			||||||
        clusterRoutingService = new ConsistentHashPartitionService(discoveryService, routingInfoService, applicationEventPublisher);
 | 
					        ruleEngineSettings = mock(TbQueueRuleEngineSettings.class);
 | 
				
			||||||
 | 
					        clusterRoutingService = new ConsistentHashPartitionService(discoveryService,
 | 
				
			||||||
 | 
					                routingInfoService,
 | 
				
			||||||
 | 
					                applicationEventPublisher,
 | 
				
			||||||
 | 
					                ruleEngineSettings
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					        when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList());
 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
 | 
					        ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
 | 
					        ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "ruleEngineTopic", "tb.rule-engine");
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "ruleEnginePartitions", 100);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
 | 
					        ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
 | 
				
			||||||
        ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
 | 
					        ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
 | 
				
			||||||
        TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
 | 
					        TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
 | 
				
			||||||
@ -107,8 +115,9 @@ public class ConsistentHashParitionServiceTest {
 | 
				
			|||||||
        List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
 | 
					        List<Map.Entry<Integer, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
 | 
				
			||||||
        long end = System.currentTimeMillis();
 | 
					        long end = System.currentTimeMillis();
 | 
				
			||||||
        double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
 | 
					        double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
 | 
				
			||||||
        System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", (diff / ITERATIONS) * 100.0) + "%)");
 | 
					        double diffPercent = (diff / ITERATIONS) * 100.0;
 | 
				
			||||||
 | 
					        System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
 | 
				
			||||||
 | 
					        Assert.assertTrue(diffPercent < 0.5);
 | 
				
			||||||
        for (Map.Entry<Integer, Integer> entry : data) {
 | 
					        for (Map.Entry<Integer, Integer> entry : data) {
 | 
				
			||||||
            System.out.println(entry.getKey() + ": " + entry.getValue());
 | 
					            System.out.println(entry.getKey() + ": " + entry.getValue());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
				
			|||||||
@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
				
			|||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
					import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
				
			||||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
					import org.thingsboard.server.gen.transport.TransportProtos;
 | 
				
			||||||
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
 | 
					import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
 | 
				
			||||||
 | 
					import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import javax.annotation.PostConstruct;
 | 
					import javax.annotation.PostConstruct;
 | 
				
			||||||
import java.nio.charset.StandardCharsets;
 | 
					import java.nio.charset.StandardCharsets;
 | 
				
			||||||
@ -61,6 +62,7 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
				
			|||||||
    private final ApplicationEventPublisher applicationEventPublisher;
 | 
					    private final ApplicationEventPublisher applicationEventPublisher;
 | 
				
			||||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
					    private final TbServiceInfoProvider serviceInfoProvider;
 | 
				
			||||||
    private final TenantRoutingInfoService tenantRoutingInfoService;
 | 
					    private final TenantRoutingInfoService tenantRoutingInfoService;
 | 
				
			||||||
 | 
					    private final TbQueueRuleEngineSettings tbQueueRuleEngineSettings;
 | 
				
			||||||
    private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>();
 | 
				
			||||||
    private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>();
 | 
				
			||||||
    private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>();
 | 
					    private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>();
 | 
				
			||||||
@ -74,10 +76,14 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    private HashFunction hashFunction;
 | 
					    private HashFunction hashFunction;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher) {
 | 
					    public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider,
 | 
				
			||||||
 | 
					                                          TenantRoutingInfoService tenantRoutingInfoService,
 | 
				
			||||||
 | 
					                                          ApplicationEventPublisher applicationEventPublisher,
 | 
				
			||||||
 | 
					                                          TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
 | 
				
			||||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
					        this.serviceInfoProvider = serviceInfoProvider;
 | 
				
			||||||
        this.tenantRoutingInfoService = tenantRoutingInfoService;
 | 
					        this.tenantRoutingInfoService = tenantRoutingInfoService;
 | 
				
			||||||
        this.applicationEventPublisher = applicationEventPublisher;
 | 
					        this.applicationEventPublisher = applicationEventPublisher;
 | 
				
			||||||
 | 
					        this.tbQueueRuleEngineSettings = tbQueueRuleEngineSettings;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @PostConstruct
 | 
					    @PostConstruct
 | 
				
			||||||
@ -85,6 +91,10 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
				
			|||||||
        this.hashFunction = forName(hashFunctionName);
 | 
					        this.hashFunction = forName(hashFunctionName);
 | 
				
			||||||
        partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
 | 
					        partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
 | 
				
			||||||
        partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
 | 
					        partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
 | 
				
			||||||
 | 
					        tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
 | 
				
			||||||
 | 
					            partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
 | 
				
			||||||
 | 
					            partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getPartitions());
 | 
				
			||||||
 | 
					        });
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
 | 
				
			|||||||
@ -70,6 +70,8 @@ services:
 | 
				
			|||||||
      - kafka
 | 
					      - kafka
 | 
				
			||||||
      - redis
 | 
					      - redis
 | 
				
			||||||
      - tb-js-executor
 | 
					      - tb-js-executor
 | 
				
			||||||
 | 
					      - tb-rule-engine1
 | 
				
			||||||
 | 
					      - tb-rule-engine2
 | 
				
			||||||
  tb-core2:
 | 
					  tb-core2:
 | 
				
			||||||
    restart: always
 | 
					    restart: always
 | 
				
			||||||
    image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
 | 
					    image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
 | 
				
			||||||
@ -92,6 +94,8 @@ services:
 | 
				
			|||||||
      - kafka
 | 
					      - kafka
 | 
				
			||||||
      - redis
 | 
					      - redis
 | 
				
			||||||
      - tb-js-executor
 | 
					      - tb-js-executor
 | 
				
			||||||
 | 
					      - tb-rule-engine1
 | 
				
			||||||
 | 
					      - tb-rule-engine2
 | 
				
			||||||
  tb-rule-engine1:
 | 
					  tb-rule-engine1:
 | 
				
			||||||
    restart: always
 | 
					    restart: always
 | 
				
			||||||
    image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
 | 
					    image: "${DOCKER_REPO}/${TB_NODE_DOCKER_NAME}:${TB_VERSION}"
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user