Distribute partitions between Rule Engines based only on tenant id

This commit is contained in:
ViacheslavKlimov 2023-10-27 14:43:12 +03:00
parent 9d6ed79123
commit 9e408cd6f3
2 changed files with 30 additions and 1 deletions

View File

@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -365,6 +366,35 @@ public class HashPartitionServiceTest {
assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue();
}
@Test
public void testPartitionsDistribution_sameTenantDifferentQueues() {
List<ServiceInfo> ruleEngines = new ArrayList<>();
int serviceId = 0;
for (int i = 0; i < 5; i++) {
ServiceInfo commonServer = ServiceInfo.newBuilder()
.setServiceId("tb-rule-engine-" + serviceId)
.addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name()))
.build();
ruleEngines.add(commonServer);
serviceId++;
}
Stream.concat(Stream.of(TenantId.SYS_TENANT_ID), Stream.generate(UUID::randomUUID).map(TenantId::new).limit(10)).forEach(tenantId -> {
List<QueueKey> queues = Stream.generate(() -> RandomStringUtils.randomAlphabetic(10))
.map(queueName -> new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId))
.limit(100).collect(Collectors.toList());
for (int partition = 0; partition < 10; partition++) {
ServiceInfo expectedAssignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId), partition);
for (QueueKey queueKey : queues) {
ServiceInfo assignedRuleEngine = clusterRoutingService.resolveByPartitionIdx(ruleEngines, queueKey, partition);
assertThat(assignedRuleEngine).as(queueKey + "[" + partition + "] should be assigned to " + expectedAssignedRuleEngine.getServiceId())
.isEqualTo(expectedAssignedRuleEngine);
}
}
});
}
private void verifyPartitionChangeEvent(Predicate<PartitionChangeEvent> predicate) {
verify(applicationEventPublisher).publishEvent(argThat(event -> event instanceof PartitionChangeEvent && predicate.test((PartitionChangeEvent) event)));
}

View File

@ -529,7 +529,6 @@ public class HashPartitionService implements PartitionService {
int hash = hashFunction.newHasher()
.putLong(tenantId.getId().getMostSignificantBits())
.putLong(tenantId.getId().getLeastSignificantBits())
.putString(queueKey.getQueueName(), StandardCharsets.UTF_8)
.hash().asInt();
return servers.get(Math.abs((hash + partition) % servers.size()));
} else {