Change of the partition routing strategy
This commit is contained in:
parent
b8de64f3fb
commit
f07d7441b2
@ -621,8 +621,7 @@ queue:
|
||||
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||
js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||
partitions:
|
||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
||||
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
|
||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
|
||||
transport_api:
|
||||
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
||||
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
||||
|
||||
@ -26,14 +26,14 @@ import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.queue.discovery.ConsistentHashPartitionService;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
|
||||
import org.thingsboard.server.queue.discovery.HashPartitionService;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
|
||||
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
|
||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
@ -48,18 +48,18 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
@Slf4j
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class ConsistentHashParitionServiceTest {
|
||||
public class HashPartitionServiceTest {
|
||||
|
||||
public static final int ITERATIONS = 1000000;
|
||||
private ConsistentHashPartitionService clusterRoutingService;
|
||||
public static final int SERVER_COUNT = 3;
|
||||
private HashPartitionService clusterRoutingService;
|
||||
|
||||
private TbServiceInfoProvider discoveryService;
|
||||
private TenantRoutingInfoService routingInfoService;
|
||||
private ApplicationEventPublisher applicationEventPublisher;
|
||||
private TbQueueRuleEngineSettings ruleEngineSettings;
|
||||
|
||||
private String hashFunctionName = "murmur3_128";
|
||||
private Integer virtualNodesSize = 16;
|
||||
private String hashFunctionName = "sha256";
|
||||
|
||||
|
||||
@Before
|
||||
@ -68,25 +68,28 @@ public class ConsistentHashParitionServiceTest {
|
||||
applicationEventPublisher = mock(ApplicationEventPublisher.class);
|
||||
routingInfoService = mock(TenantRoutingInfoService.class);
|
||||
ruleEngineSettings = mock(TbQueueRuleEngineSettings.class);
|
||||
clusterRoutingService = new ConsistentHashPartitionService(discoveryService,
|
||||
clusterRoutingService = new HashPartitionService(discoveryService,
|
||||
routingInfoService,
|
||||
applicationEventPublisher,
|
||||
ruleEngineSettings
|
||||
);
|
||||
when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList());
|
||||
ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
|
||||
ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
|
||||
ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 10);
|
||||
ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
|
||||
ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize);
|
||||
TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
|
||||
.setServiceId("100.96.1.1")
|
||||
.setServiceId("tb-core-0")
|
||||
.setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
|
||||
.setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
|
||||
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
|
||||
.build();
|
||||
// when(discoveryService.getServiceInfo()).thenReturn(currentServer);
|
||||
List<TransportProtos.ServiceInfo> otherServers = new ArrayList<>();
|
||||
for (int i = 1; i < 30; i++) {
|
||||
for (int i = 1; i < SERVER_COUNT; i++) {
|
||||
otherServers.add(TransportProtos.ServiceInfo.newBuilder()
|
||||
.setServiceId("100.96." + i * 2 + "." + i)
|
||||
.setServiceId("tb-rule-" + i)
|
||||
.setTenantIdMSB(TenantId.NULL_UUID.getMostSignificantBits())
|
||||
.setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits())
|
||||
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
|
||||
.build());
|
||||
}
|
||||
@ -116,12 +119,11 @@ public class ConsistentHashParitionServiceTest {
|
||||
long end = System.currentTimeMillis();
|
||||
double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
|
||||
double diffPercent = (diff / ITERATIONS) * 100.0;
|
||||
System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
|
||||
System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
|
||||
Assert.assertTrue(diffPercent < 0.5);
|
||||
for (Map.Entry<Integer, Integer> entry : data) {
|
||||
System.out.println(entry.getKey() + ": " + entry.getValue());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.discovery;
|
||||
import com.google.common.hash.HashCode;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
@ -35,6 +36,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -48,7 +50,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ConsistentHashPartitionService implements PartitionService {
|
||||
public class HashPartitionService implements PartitionService {
|
||||
|
||||
@Value("${queue.core.topic}")
|
||||
private String coreTopic;
|
||||
@ -56,8 +58,6 @@ public class ConsistentHashPartitionService implements PartitionService {
|
||||
private Integer corePartitions;
|
||||
@Value("${queue.partitions.hash_function_name:murmur3_128}")
|
||||
private String hashFunctionName;
|
||||
@Value("${queue.partitions.virtual_nodes_size:16}")
|
||||
private Integer virtualNodesSize;
|
||||
|
||||
private final ApplicationEventPublisher applicationEventPublisher;
|
||||
private final TbServiceInfoProvider serviceInfoProvider;
|
||||
@ -76,10 +76,10 @@ public class ConsistentHashPartitionService implements PartitionService {
|
||||
|
||||
private HashFunction hashFunction;
|
||||
|
||||
public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider,
|
||||
TenantRoutingInfoService tenantRoutingInfoService,
|
||||
ApplicationEventPublisher applicationEventPublisher,
|
||||
TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
|
||||
public HashPartitionService(TbServiceInfoProvider serviceInfoProvider,
|
||||
TenantRoutingInfoService tenantRoutingInfoService,
|
||||
ApplicationEventPublisher applicationEventPublisher,
|
||||
TbQueueRuleEngineSettings tbQueueRuleEngineSettings) {
|
||||
this.serviceInfoProvider = serviceInfoProvider;
|
||||
this.tenantRoutingInfoService = tenantRoutingInfoService;
|
||||
this.applicationEventPublisher = applicationEventPublisher;
|
||||
@ -128,20 +128,22 @@ public class ConsistentHashPartitionService implements PartitionService {
|
||||
public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
|
||||
logServiceInfo(currentService);
|
||||
otherServices.forEach(this::logServiceInfo);
|
||||
Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles = new HashMap<>();
|
||||
addNode(circles, currentService);
|
||||
Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
|
||||
addNode(queueServicesMap, currentService);
|
||||
for (ServiceInfo other : otherServices) {
|
||||
addNode(circles, other);
|
||||
addNode(queueServicesMap, other);
|
||||
}
|
||||
queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));
|
||||
|
||||
ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
|
||||
TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
|
||||
myPartitions = new ConcurrentHashMap<>();
|
||||
partitionSizes.forEach((type, size) -> {
|
||||
ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myIsolatedOrSystemTenantId);
|
||||
partitionSizes.forEach((serviceQueue, size) -> {
|
||||
ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
|
||||
for (int i = 0; i < size; i++) {
|
||||
ServiceInfo serviceInfo = resolveByPartitionIdx(circles.get(myServiceQueueKey), i);
|
||||
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
|
||||
if (currentService.equals(serviceInfo)) {
|
||||
ServiceQueueKey serviceQueueKey = new ServiceQueueKey(type, getSystemOrIsolatedTenantId(serviceInfo));
|
||||
ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceQueue, getSystemOrIsolatedTenantId(serviceInfo));
|
||||
myPartitions.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(i);
|
||||
}
|
||||
}
|
||||
@ -293,7 +295,7 @@ public class ConsistentHashPartitionService implements PartitionService {
|
||||
return new TenantId(new UUID(serviceInfo.getTenantIdMSB(), serviceInfo.getTenantIdLSB()));
|
||||
}
|
||||
|
||||
private void addNode(Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
|
||||
private void addNode(Map<ServiceQueueKey, List<ServiceInfo>> queueServiceList, ServiceInfo instance) {
|
||||
TenantId tenantId = getSystemOrIsolatedTenantId(instance);
|
||||
for (String serviceTypeStr : instance.getServiceTypesList()) {
|
||||
ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
|
||||
@ -302,34 +304,20 @@ public class ConsistentHashPartitionService implements PartitionService {
|
||||
ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType, queue.getName()), tenantId);
|
||||
partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queue.getName()), queue.getPartitions());
|
||||
partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queue.getName()), queue.getTopic());
|
||||
for (int i = 0; i < virtualNodesSize; i++) {
|
||||
circles.computeIfAbsent(serviceQueueKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
|
||||
}
|
||||
queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
|
||||
}
|
||||
} else {
|
||||
ServiceQueueKey serviceQueueKey = new ServiceQueueKey(new ServiceQueue(serviceType), tenantId);
|
||||
for (int i = 0; i < virtualNodesSize; i++) {
|
||||
circles.computeIfAbsent(serviceQueueKey, key -> new ConsistentHashCircle<>()).put(hash(instance, i).asLong(), instance);
|
||||
}
|
||||
queueServiceList.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(instance);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ServiceInfo resolveByPartitionIdx(ConsistentHashCircle<ServiceInfo> circle, Integer partitionIdx) {
|
||||
if (circle == null || circle.isEmpty()) {
|
||||
private ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, Integer partitionIdx) {
|
||||
if (servers == null || servers.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Long hash = hashFunction.newHasher().putInt(partitionIdx).hash().asLong();
|
||||
if (!circle.containsKey(hash)) {
|
||||
ConcurrentNavigableMap<Long, ServiceInfo> tailMap = circle.tailMap(hash);
|
||||
hash = tailMap.isEmpty() ?
|
||||
circle.firstKey() : tailMap.firstKey();
|
||||
}
|
||||
return circle.get(hash);
|
||||
}
|
||||
|
||||
private HashCode hash(ServiceInfo instance, int i) {
|
||||
return hashFunction.newHasher().putString(instance.getServiceId(), StandardCharsets.UTF_8).putInt(i).hash();
|
||||
return servers.get(partitionIdx % servers.size());
|
||||
}
|
||||
|
||||
public static HashFunction forName(String name) {
|
||||
@ -338,12 +326,11 @@ public class ConsistentHashPartitionService implements PartitionService {
|
||||
return Hashing.murmur3_32();
|
||||
case "murmur3_128":
|
||||
return Hashing.murmur3_128();
|
||||
case "crc32":
|
||||
return Hashing.crc32();
|
||||
case "md5":
|
||||
return Hashing.md5();
|
||||
case "sha256":
|
||||
return Hashing.sha256();
|
||||
default:
|
||||
throw new IllegalArgumentException("Can't find hash function with name " + name);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user