Merge pull request #9257 from thingsboard/improvements/recalculate-partitions

PartitionService improvements
This commit is contained in:
Andrew Shvayka 2023-09-18 16:14:56 +03:00 committed by GitHub
commit efc44b2fc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 44 deletions

View File

@ -121,7 +121,7 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
log.info("Received partition change event."); log.info("Received partition change event.");
this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getQueueKey().getType(), event.getPartitions())); this.appActor.tellWithHighPriority(new PartitionChangeMsg(event.getServiceType()));
} }
@PreDestroy @PreDestroy

View File

@ -183,19 +183,20 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
if (event.getServiceType().equals(getServiceType())) { if (event.getServiceType().equals(getServiceType())) {
String serviceQueue = event.getQueueKey().getQueueName(); event.getPartitionsMap().forEach((queueKey, partitions) -> {
log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions()); String serviceQueue = queueKey.getQueueName();
Queue configuration = consumerConfigurations.get(event.getQueueKey()); log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions);
Queue configuration = consumerConfigurations.get(queueKey);
if (configuration == null) { if (configuration == null) {
log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
return; return;
} }
if (!configuration.isConsumerPerPartition()) { if (!configuration.isConsumerPerPartition()) {
consumers.get(event.getQueueKey()).subscribe(event.getPartitions()); consumers.get(queueKey).subscribe(partitions);
} else { } else {
log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions()); log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, partitions);
subscribeConsumerPerPartition(event.getQueueKey(), event.getPartitions()); subscribeConsumerPerPartition(queueKey, partitions);
} }
});
} }
} }
@ -501,7 +502,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
} }
} }
partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
} }
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {

View File

@ -286,10 +286,8 @@ public class HashPartitionServiceTest {
HashPartitionService partitionService_common = createPartitionService(); HashPartitionService partitionService_common = createPartitionService();
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
verifyPartitionChangeEvent(event -> { verifyPartitionChangeEvent(event -> {
return event.getQueueKey().getTenantId().isSysTenantId() && QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, TenantId.SYS_TENANT_ID);
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && return event.getPartitionsMap().get(queueKey).size() == systemQueue.getPartitions();
event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
.size() == systemQueue.getPartitions();
}); });
Mockito.reset(applicationEventPublisher); Mockito.reset(applicationEventPublisher);
@ -316,18 +314,15 @@ public class HashPartitionServiceTest {
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
// expecting event about no partitions for isolated queue key // expecting event about no partitions for isolated queue key
verifyPartitionChangeEvent(event -> { verifyPartitionChangeEvent(event -> {
return event.getQueueKey().getTenantId().equals(tenantId) && QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && return event.getPartitionsMap().get(queueKey).isEmpty();
event.getPartitions().isEmpty();
}); });
partitionService_dedicated.updateQueue(queueUpdateMsg); partitionService_dedicated.updateQueue(queueUpdateMsg);
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
verifyPartitionChangeEvent(event -> { verifyPartitionChangeEvent(event -> {
return event.getQueueKey().getTenantId().equals(tenantId) && QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && return event.getPartitionsMap().get(queueKey).size() == isolatedQueue.getPartitions();
event.getPartitions().stream().map(TopicPartitionInfo::getPartition).collect(Collectors.toSet())
.size() == isolatedQueue.getPartitions();
}); });
@ -343,11 +338,9 @@ public class HashPartitionServiceTest {
.setQueueName(isolatedQueue.getName()) .setQueueName(isolatedQueue.getName())
.build(); .build();
partitionService_dedicated.removeQueue(queueDeleteMsg); partitionService_dedicated.removeQueue(queueDeleteMsg);
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
verifyPartitionChangeEvent(event -> { verifyPartitionChangeEvent(event -> {
return event.getQueueKey().getTenantId().equals(tenantId) && QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
event.getQueueKey().getQueueName().equals(DataConstants.MAIN_QUEUE_NAME) && return event.getPartitionsMap().get(queueKey).isEmpty();
event.getPartitions().isEmpty();
}); });
} }

View File

@ -160,7 +160,9 @@ public class DefaultDeviceStateServiceTest {
Mockito.reset(service, telemetrySubscriptionService); Mockito.reset(service, telemetrySubscriptionService);
ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout); ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout);
service.init(); service.init();
PartitionChangeEvent event = new PartitionChangeEvent(this, new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)); PartitionChangeEvent event = new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(
new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)
));
service.onApplicationEvent(event); service.onApplicationEvent(event);
Thread.sleep(100); Thread.sleep(100);
} }

View File

@ -20,8 +20,6 @@ import lombok.Getter;
import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorMsg;
import java.util.Set;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@ -30,8 +28,6 @@ public final class PartitionChangeMsg implements TbActorMsg {
@Getter @Getter
private final ServiceType serviceType; private final ServiceType serviceType;
@Getter
private final Set<TopicPartitionInfo> partitions;
@Override @Override
public MsgType getMsgType() { public MsgType getMsgType() {

View File

@ -179,10 +179,15 @@ public class HashPartitionService implements PartitionService {
public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
myPartitions.remove(queueKey);
partitionTopicsMap.remove(queueKey); partitionTopicsMap.remove(queueKey);
partitionSizesMap.remove(queueKey); partitionSizesMap.remove(queueKey);
//TODO: remove after merging tb entity services //TODO: remove after merging tb entity services
removeTenant(tenantId); removeTenant(tenantId);
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet()));
}
} }
@Override @Override
@ -271,6 +276,8 @@ public class HashPartitionService implements PartitionService {
final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions; final ConcurrentMap<QueueKey, List<Integer>> oldPartitions = myPartitions;
myPartitions = newPartitions; myPartitions = newPartitions;
Map<QueueKey, Set<TopicPartitionInfo>> changedPartitionsMap = new HashMap<>();
Set<QueueKey> removed = new HashSet<>(); Set<QueueKey> removed = new HashSet<>();
oldPartitions.forEach((queueKey, partitions) -> { oldPartitions.forEach((queueKey, partitions) -> {
if (!newPartitions.containsKey(queueKey)) { if (!newPartitions.containsKey(queueKey)) {
@ -286,7 +293,7 @@ public class HashPartitionService implements PartitionService {
} }
removed.forEach(queueKey -> { removed.forEach(queueKey -> {
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey); log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey);
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, Collections.emptySet())); changedPartitionsMap.put(queueKey, Collections.emptySet());
}); });
myPartitions.forEach((queueKey, partitions) -> { myPartitions.forEach((queueKey, partitions) -> {
@ -295,9 +302,17 @@ public class HashPartitionService implements PartitionService {
Set<TopicPartitionInfo> tpiList = partitions.stream() Set<TopicPartitionInfo> tpiList = partitions.stream()
.map(partition -> buildTopicPartitionInfo(queueKey, partition)) .map(partition -> buildTopicPartitionInfo(queueKey, partition))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, queueKey, tpiList)); changedPartitionsMap.put(queueKey, tpiList);
} }
}); });
if (!changedPartitionsMap.isEmpty()) {
Map<ServiceType, Map<QueueKey, Set<TopicPartitionInfo>>> partitionsByServiceType = new HashMap<>();
changedPartitionsMap.forEach((queueKey, partitions) -> {
partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>())
.put(queueKey, partitions);
});
partitionsByServiceType.forEach(this::publishPartitionChangeEvent);
}
if (currentOtherServices == null) { if (currentOtherServices == null) {
currentOtherServices = new ArrayList<>(otherServices); currentOtherServices = new ArrayList<>(otherServices);
@ -328,6 +343,18 @@ public class HashPartitionService implements PartitionService {
applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService)); applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService));
} }
private void publishPartitionChangeEvent(ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
if (log.isDebugEnabled()) {
log.debug("Publishing partition change event for service type " + serviceType + ":" + System.lineSeparator() +
partitionsMap.entrySet().stream()
.map(entry -> entry.getKey() + " - " + entry.getValue().stream()
.map(TopicPartitionInfo::getFullTopicName).sorted()
.collect(Collectors.toList()))
.collect(Collectors.joining(System.lineSeparator())));
}
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
}
@Override @Override
public Set<String> getAllServiceIds(ServiceType serviceType) { public Set<String> getAllServiceIds(ServiceType serviceType) {
return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet()); return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet());
@ -493,6 +520,9 @@ public class HashPartitionService implements PartitionService {
} }
responsibleServices.put(profileId, responsible); responsibleServices.put(profileId, responsible);
} }
if (responsible.isEmpty()) {
return null;
}
servers = responsible; servers = responsible;
} }

View File

@ -21,6 +21,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.QueueKey;
import java.util.Collections;
import java.util.Map;
import java.util.Set; import java.util.Set;
@ToString(callSuper = true) @ToString(callSuper = true)
@ -29,17 +31,19 @@ public class PartitionChangeEvent extends TbApplicationEvent {
private static final long serialVersionUID = -8731788167026510559L; private static final long serialVersionUID = -8731788167026510559L;
@Getter @Getter
private final QueueKey queueKey; private final ServiceType serviceType;
@Getter @Getter
private final Set<TopicPartitionInfo> partitions; private final Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap;
public PartitionChangeEvent(Object source, QueueKey queueKey, Set<TopicPartitionInfo> partitions) { public PartitionChangeEvent(Object source, ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
super(source); super(source);
this.queueKey = queueKey; this.serviceType = serviceType;
this.partitions = partitions; this.partitionsMap = partitionsMap;
} }
public ServiceType getServiceType() { // only for service types that have single QueueKey
return queueKey.getType(); public Set<TopicPartitionInfo> getPartitions() {
return partitionsMap.values().stream().findAny().orElse(Collections.emptySet());
} }
} }