Improve hash algorithm to affect only specific tenant queues
This commit is contained in:
parent
faf024b610
commit
24c1424335
@ -15,7 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.queue.discovery;
|
package org.thingsboard.server.queue.discovery;
|
||||||
|
|
||||||
|
import com.datastax.driver.core.utils.UUIDs;
|
||||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -30,12 +32,15 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
|||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
@ -107,13 +112,15 @@ public class HashPartitionServiceTest {
|
|||||||
map.put(partition, map.getOrDefault(partition, 0) + 1);
|
map.put(partition, map.getOrDefault(partition, 0) + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
printDispersion(start, map, ITERATIONS);
|
checkDispersion(start, map, ITERATIONS, 1.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
@Test
|
@Test
|
||||||
public void testDispersionOnResolveByPartitionIdx() {
|
public void testDispersionOnResolveByPartitionIdx() {
|
||||||
int serverCount = 10;
|
int serverCount = 5;
|
||||||
int queueCount = 10000;
|
int tenantCount = 1000;
|
||||||
|
int queueCount = 3;
|
||||||
int partitionCount = 3;
|
int partitionCount = 3;
|
||||||
|
|
||||||
List<TransportProtos.ServiceInfo> services = new ArrayList<>();
|
List<TransportProtos.ServiceInfo> services = new ArrayList<>();
|
||||||
@ -126,28 +133,35 @@ public class HashPartitionServiceTest {
|
|||||||
Map<String, Integer> map = new HashMap<>();
|
Map<String, Integer> map = new HashMap<>();
|
||||||
services.forEach(s -> map.put(s.getServiceId(), 0));
|
services.forEach(s -> map.put(s.getServiceId(), 0));
|
||||||
|
|
||||||
for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) {
|
Random random = new Random();
|
||||||
for (int partition = 0; partition < partitionCount; partition++) {
|
long ts = new SimpleDateFormat("dd-MM-yyyy").parse("06-12-2016").getTime() - TimeUnit.DAYS.toMillis(tenantCount);
|
||||||
TopicPartitionInfo tpi = new TopicPartitionInfo("tb_rule_engine.queue_" + queueIndex, TenantId.SYS_TENANT_ID, partition, false);
|
for (int tenantIndex = 0; tenantIndex < tenantCount; tenantIndex++) {
|
||||||
TransportProtos.ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, tpi);
|
TenantId tenantId = new TenantId(UUIDs.startOf(ts));
|
||||||
String serviceId = serviceInfo.getServiceId();
|
ts += TimeUnit.DAYS.toMillis(1) + random.nextInt(1000);
|
||||||
map.put(serviceId, map.get(serviceId) + 1);
|
for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) {
|
||||||
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "queue" + queueIndex, tenantId);
|
||||||
|
for (int partition = 0; partition < partitionCount; partition++) {
|
||||||
|
TransportProtos.ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition);
|
||||||
|
String serviceId = serviceInfo.getServiceId();
|
||||||
|
map.put(serviceId, map.get(serviceId) + 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printDispersion(start, map, queueCount * partitionCount);
|
checkDispersion(start, map, tenantCount * queueCount * partitionCount, 10.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> void printDispersion(long start, Map<T, Integer> map, int iterations) {
|
private <T> void checkDispersion(long start, Map<T, Integer> map, int iterations, double maxDiffPercent) {
|
||||||
List<Map.Entry<T, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
|
List<Map.Entry<T, Integer>> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList());
|
||||||
long end = System.currentTimeMillis();
|
long end = System.currentTimeMillis();
|
||||||
double diff = (data.get(data.size() - 1).getValue() - data.get(0).getValue());
|
double ideal = ((double) iterations) / map.size();
|
||||||
double diffPercent = (diff / iterations) * 100.0;
|
double diff = Math.max(data.get(data.size() - 1).getValue() - ideal, ideal - data.get(0).getValue());
|
||||||
|
double diffPercent = (diff / ideal) * 100.0;
|
||||||
System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
|
System.out.println("Time: " + (end - start) + " Diff: " + diff + "(" + String.format("%f", diffPercent) + "%)");
|
||||||
Assert.assertTrue(diffPercent < 0.5);
|
|
||||||
for (Map.Entry<T, Integer> entry : data) {
|
for (Map.Entry<T, Integer> entry : data) {
|
||||||
System.out.println(entry.getKey() + ": " + entry.getValue());
|
System.out.println(entry.getKey() + ": " + entry.getValue());
|
||||||
}
|
}
|
||||||
|
Assert.assertTrue(diffPercent < maxDiffPercent);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -34,6 +34,7 @@ import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent;
|
|||||||
import org.thingsboard.server.queue.util.AfterStartUp;
|
import org.thingsboard.server.queue.util.AfterStartUp;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -253,12 +254,8 @@ public class HashPartitionService implements PartitionService {
|
|||||||
ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
|
ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
|
||||||
myPartitions = new ConcurrentHashMap<>();
|
myPartitions = new ConcurrentHashMap<>();
|
||||||
partitionSizesMap.forEach((queueKey, size) -> {
|
partitionSizesMap.forEach((queueKey, size) -> {
|
||||||
TenantId tenantId = queueKey.getTenantId();
|
|
||||||
String topic = partitionTopicsMap.get(queueKey);
|
|
||||||
|
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
TopicPartitionInfo tpi = new TopicPartitionInfo(topic, tenantId, i, false);
|
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i);
|
||||||
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), tpi);
|
|
||||||
if (currentService.equals(serviceInfo)) {
|
if (currentService.equals(serviceInfo)) {
|
||||||
myPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
|
myPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
|
||||||
}
|
}
|
||||||
@ -438,14 +435,21 @@ public class HashPartitionService implements PartitionService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, TopicPartitionInfo tpi) {
|
protected ServiceInfo resolveByPartitionIdx(List<ServiceInfo> servers, QueueKey queueKey, int partition) {
|
||||||
if (servers == null || servers.isEmpty()) {
|
if (servers == null || servers.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
int hash = hashFunction.newHasher().putInt(tpi.hashCode()).hash().asInt();
|
if (!ServiceType.TB_RULE_ENGINE.equals(queueKey.getType()) || TenantId.SYS_TENANT_ID.equals(queueKey.getTenantId())) {
|
||||||
|
return servers.get(partition % servers.size());
|
||||||
|
} else {
|
||||||
|
int hash = hashFunction.newHasher().putLong(queueKey.getTenantId().getId().getMostSignificantBits())
|
||||||
|
.putLong(queueKey.getTenantId().getId().getLeastSignificantBits())
|
||||||
|
.putString(queueKey.getQueueName(), StandardCharsets.UTF_8)
|
||||||
|
.hash().asInt();
|
||||||
|
|
||||||
return servers.get(Math.abs(hash % servers.size()));
|
return servers.get(Math.abs((hash + partition) % servers.size()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HashFunction forName(String name) {
|
public static HashFunction forName(String name) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user