Merge branch 'master' of github.com:thingsboard/thingsboard into develop/3.0
This commit is contained in:
		
						commit
						2a7e220fc3
					
				@ -620,8 +620,7 @@ queue:
 | 
			
		||||
      notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
 | 
			
		||||
      js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
 | 
			
		||||
  partitions:
 | 
			
		||||
    hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
 | 
			
		||||
    virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
 | 
			
		||||
    hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
 | 
			
		||||
  transport_api:
 | 
			
		||||
    requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
 | 
			
		||||
    responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
 | 
			
		||||
 | 
			
		||||
@ -26,14 +26,14 @@ import org.springframework.context.ApplicationEventPublisher;
 | 
			
		||||
import org.springframework.test.util.ReflectionTestUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.ConsistentHashPartitionService;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.HashPartitionService;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.ServiceType;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
@ -48,18 +48,18 @@ import static org.mockito.Mockito.when;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class ConsistentHashParitionServiceTest {
 | 
			
		||||
public class HashPartitionServiceTest {
 | 
			
		||||
 | 
			
		||||
    public static final int ITERATIONS = 1000000;
 | 
			
		||||
    private ConsistentHashPartitionService clusterRoutingService;
 | 
			
		||||
    public static final int SERVER_COUNT = 3;
 | 
			
		||||
    private HashPartitionService clusterRoutingService;
 | 
			
		||||
 | 
			
		||||
    private TbServiceInfoProvider discoveryService;
 | 
			
		||||
    private TenantRoutingInfoService routingInfoService;
 | 
			
		||||
    private ApplicationEventPublisher applicationEventPublisher;
 | 
			
		||||
    private TbQueueRuleEngineSettings ruleEngineSettings;
 | 
			
		||||
 | 
			
		||||
    private String hashFunctionName = "murmur3_128";
 | 
			
		||||
    private Integer virtualNodesSize = 16;
 | 
			
		||||
    private String hashFunctionName = "sha256";
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
@ -68,25 +68,28 @@ public class ConsistentHashParitionServiceTest {
 | 
			
		||||
        applicationEventPublisher = mock(ApplicationEventPublisher.class);
 | 
			
		||||
        routingInfoService = mock(TenantRoutingInfoService.class);
 | 
			
		||||
        ruleEngineSettings = mock(TbQueueRuleEngineSettings.class);
 | 
			
		||||
        clusterRoutingService = new ConsistentHashPartitionService(discoveryService,
 | 
			
		||||
        clusterRoutingService = new HashPartitionService(discoveryService,
 | 
			
		||||
                routingInfoService,
 | 
			
		||||
                applicationEventPublisher,
 | 
			
		||||
                ruleEngineSettings
 | 
			
		||||
        );
 | 
			
		||||
        when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList());
 | 
			
		||||
        ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
 | 
			
		||||
        ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
 | 
			
		||||
        ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 10);
 | 
			
		||||
        ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
 | 
			
		||||
        ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
 | 
			
		||||
        TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
 | 
			
		||||
                .setServiceId("100.96.1.1")
 | 
			
		||||
                .setServiceId("tb-core-0")
 | 
			
		||||
                .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
 | 
			
		||||
                .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
 | 
			
		||||
                .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
 | 
			
		||||
                .build();
 | 
			
		||||
//        when(discoveryService.getServiceInfo()).thenReturn(currentServer);
 | 
			
		||||
        List<TransportProtos.ServiceInfo> otherServers = new ArrayList<>();
 | 
			
		||||
        for (int i = 1; i < 30; i++) {
 | 
			
		||||
        for (int i = 1; i < SERVER_COUNT; i++) {
 | 
			
		||||
            otherServers.add(TransportProtos.ServiceInfo.newBuilder()
 | 
			
		||||
                    .setServiceId("100.96." + i * 2 + "." + i)
 | 
			
		||||
                    .setServiceId("tb-rule-" + i)
 | 
			
		||||
                    .setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
 | 
			
		||||
                    .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
 | 
			
		||||
                    .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
 | 
			
		||||
                    .build());
 | 
			
		||||
        }
 | 
			
		||||
@ -116,12 +119,11 @@ public class ConsistentHashParitionServiceTest {
 | 
			
		||||
        long end = System.currentTimeMillis();
 | 
			
		||||
        double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
 | 
			
		||||
        double diffPercent = (diff / ITERATIONS) * 100.0;
 | 
			
		||||
        System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
 | 
			
		||||
        System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
 | 
			
		||||
        Assert.assertTrue(diffPercent < 0.5);
 | 
			
		||||
        for (Map.Entry<Integer, Integer> entry : data) {
 | 
			
		||||
            System.out.println(entry.getKey() + ": " + entry.getValue());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.discovery;
 | 
			
		||||
import com.google.common.hash.HashCode;
 | 
			
		||||
import com.google.common.hash.HashFunction;
 | 
			
		||||
import com.google.common.hash.Hashing;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.context.ApplicationEventPublisher;
 | 
			
		||||
@ -35,6 +36,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import java.nio.charset.StandardCharsets;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -48,7 +50,7 @@ import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
public class HashPartitionService implements PartitionService {
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.core.topic}")
 | 
			
		||||
    private String coreTopic;
 | 
			
		||||
@ -56,8 +58,6 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
    private Integer corePartitions;
 | 
			
		||||
    @Value("${queue.partitions.hash_function_name:murmur3_128}")
 | 
			
		||||
    private String hashFunctionName;
 | 
			
		||||
    @Value("${queue.partitions.virtual_nodes_size:16}")
 | 
			
		||||
    private Integer virtualNodesSize;
 | 
			
		||||
 | 
			
		||||
    private final ApplicationEventPublisher applicationEventPublisher;
 | 
			
		||||
    private final TbServiceInfoProvider serviceInfoProvider;
 | 
			
		||||
@ -76,10 +76,10 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
 | 
			
		||||
    private HashFunction hashFunction;
 | 
			
		||||
 | 
			
		||||
    public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                          TenantRoutingInfoService tenantRoutingInfoService,
 | 
			
		||||
                                          ApplicationEventPublisher applicationEventPublisher,
 | 
			
		||||
                                          TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
 | 
			
		||||
    public HashPartitionService(TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
                                TenantRoutingInfoService tenantRoutingInfoService,
 | 
			
		||||
                                ApplicationEventPublisher applicationEventPublisher,
 | 
			
		||||
                                TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
 | 
			
		||||
        this.serviceInfoProvider = serviceInfoProvider;
 | 
			
		||||
        this.tenantRoutingInfoService = tenantRoutingInfoService;
 | 
			
		||||
        this.applicationEventPublisher = applicationEventPublisher;
 | 
			
		||||
@ -128,20 +128,22 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
    public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
 | 
			
		||||
        logServiceInfo(currentService);
 | 
			
		||||
        otherServices.forEach(this::logServiceInfo);
 | 
			
		||||
        Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles = new HashMap<>();
 | 
			
		||||
        addNode(circles, currentService);
 | 
			
		||||
        Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
 | 
			
		||||
        addNode(queueServicesMap, currentService);
 | 
			
		||||
        for (ServiceInfo other : otherServices) {
 | 
			
		||||
            addNode(circles, other);
 | 
			
		||||
            addNode(queueServicesMap, other);
 | 
			
		||||
        }
 | 
			
		||||
        queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));
 | 
			
		||||
 | 
			
		||||
        ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
 | 
			
		||||
        TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
 | 
			
		||||
        myPartitions = new ConcurrentHashMap<>();
 | 
			
		||||
        partitionSizes.forEach((type, size) -> {
 | 
			
		||||
            ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myIsolatedOrSystemTenantId);
 | 
			
		||||
        partitionSizes.forEach((serviceQueue, size) -> {
 | 
			
		||||
            ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
 | 
			
		||||
            for (int i = 0; i < size; i++) {
 | 
			
		||||
                ServiceInfo serviceInfo = resolveByPartitionIdx(circles.get(myServiceQueueKey), i);
 | 
			
		||||
                ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
 | 
			
		||||
                if (currentService.equals(serviceInfo)) {
 | 
			
		||||
                    ServiceQueueKey serviceQueueKey = new ServiceQueueKey(type, getSystemOrIsolatedTenantId(serviceInfo));
 | 
			
		||||
                    ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceQueue, getSystemOrIsolatedTenantId(serviceInfo));
 | 
			
		||||
                    myPartitions.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(i);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@ -293,7 +295,7 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
        return new TenantId(new UUID(serviceInfo.getTenantIdMSB(), serviceInfo.getTenantIdLSB()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void addNode(Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
 | 
			
		||||
    private void addNode(Map<ServiceQueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) {
 | 
			
		||||
        TenantId tenantId = getSystemOrIsolatedTenantId(instance);
 | 
			
		||||
        for (String serviceTypeStr : instance.getServiceTypesList()) {
 | 
			
		||||
            ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
 | 
			
		||||
@ -302,34 +304,20 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
                    ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType, queue.getName()), tenantId);
 | 
			
		||||
                    partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queue.getName()), queue.getPartitions());
 | 
			
		||||
                    partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queue.getName()), queue.getTopic());
 | 
			
		||||
                    for (int i = 0; i < virtualNodesSize; i++) {
 | 
			
		||||
                        circles.computeIfAbsent(serviceQueueKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
 | 
			
		||||
                    }
 | 
			
		||||
                    queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType), tenantId);
 | 
			
		||||
                for (int i = 0; i < virtualNodesSize; i++) {
 | 
			
		||||
                    circles.computeIfAbsent(serviceQueueKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
 | 
			
		||||
                }
 | 
			
		||||
                queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ServiceInfo resolveByPartitionIdx(ConsistentHashCircle<ServiceInfo> circle, Integer partitionIdx) {
 | 
			
		||||
        if (circle == null || circle.isEmpty()) {
 | 
			
		||||
    private ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, Integer partitionIdx) {
 | 
			
		||||
        if (servers == null || servers.isEmpty()) {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
        Long hash = hashFunction.newHasher().putInt(partitionIdx).hash().asLong();
 | 
			
		||||
        if (!circle.containsKey(hash)) {
 | 
			
		||||
            ConcurrentNavigableMap<Long, ServiceInfo> tailMap = circle.tailMap(hash);
 | 
			
		||||
            hash = tailMap.isEmpty() ?
 | 
			
		||||
                    circle.firstKey() : tailMap.firstKey();
 | 
			
		||||
        }
 | 
			
		||||
        return circle.get(hash);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private HashCode hash(ServiceInfo instance, int i) {
 | 
			
		||||
        return hashFunction.newHasher().putString(instance.getServiceId(), StandardCharsets.UTF_8).putInt(i).hash();
 | 
			
		||||
        return servers.get(partitionIdx % servers.size());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static HashFunction forName(String name) {
 | 
			
		||||
@ -338,12 +326,11 @@ public class ConsistentHashPartitionService implements PartitionService {
 | 
			
		||||
                return Hashing.murmur3_32();
 | 
			
		||||
            case "murmur3_128":
 | 
			
		||||
                return Hashing.murmur3_128();
 | 
			
		||||
            case "crc32":
 | 
			
		||||
                return Hashing.crc32();
 | 
			
		||||
            case "md5":
 | 
			
		||||
                return Hashing.md5();
 | 
			
		||||
            case "sha256":
 | 
			
		||||
                return Hashing.sha256();
 | 
			
		||||
            default:
 | 
			
		||||
                throw new IllegalArgumentException("Can't find hash function with name " + name);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user