Queue removal: always publish PartitionChangeEvent; remove redundant recalculatePartitions; add debug log
This commit is contained in:
parent
0d8a2549d9
commit
4e526aa969
@ -188,7 +188,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions);
|
log.info("[{}] Subscribing to partitions: {}", serviceQueue, partitions);
|
||||||
Queue configuration = consumerConfigurations.get(queueKey);
|
Queue configuration = consumerConfigurations.get(queueKey);
|
||||||
if (configuration == null) {
|
if (configuration == null) {
|
||||||
log.warn("Received invalid partition change event for {} that is not managed by this service", queueKey);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!configuration.isConsumerPerPartition()) {
|
if (!configuration.isConsumerPerPartition()) {
|
||||||
@ -503,7 +502,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
partitionService.recalculatePartitions(serviceInfoProvider.getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
|
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
|
||||||
|
|||||||
@ -179,10 +179,15 @@ public class HashPartitionService implements PartitionService {
|
|||||||
public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
|
public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
|
||||||
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||||
|
myPartitions.remove(queueKey);
|
||||||
partitionTopicsMap.remove(queueKey);
|
partitionTopicsMap.remove(queueKey);
|
||||||
partitionSizesMap.remove(queueKey);
|
partitionSizesMap.remove(queueKey);
|
||||||
//TODO: remove after merging tb entity services
|
//TODO: remove after merging tb entity services
|
||||||
removeTenant(tenantId);
|
removeTenant(tenantId);
|
||||||
|
|
||||||
|
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
|
||||||
|
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -306,9 +311,7 @@ public class HashPartitionService implements PartitionService {
|
|||||||
partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>())
|
partitionsByServiceType.computeIfAbsent(queueKey.getType(), serviceType -> new HashMap<>())
|
||||||
.put(queueKey, partitions);
|
.put(queueKey, partitions);
|
||||||
});
|
});
|
||||||
partitionsByServiceType.forEach((serviceType, partitionsMap) -> {
|
partitionsByServiceType.forEach(this::publishPartitionChangeEvent);
|
||||||
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentOtherServices == null) {
|
if (currentOtherServices == null) {
|
||||||
@ -340,6 +343,18 @@ public class HashPartitionService implements PartitionService {
|
|||||||
applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService));
|
applicationEventPublisher.publishEvent(new ServiceListChangedEvent(otherServices, currentService));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void publishPartitionChangeEvent(ServiceType serviceType, Map<QueueKey, Set<TopicPartitionInfo>> partitionsMap) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Publishing partition change event for service type " + serviceType + ":" + System.lineSeparator() +
|
||||||
|
partitionsMap.entrySet().stream()
|
||||||
|
.map(entry -> entry.getKey() + " - " + entry.getValue().stream()
|
||||||
|
.map(TopicPartitionInfo::getFullTopicName).sorted()
|
||||||
|
.collect(Collectors.toList()))
|
||||||
|
.collect(Collectors.joining(System.lineSeparator())));
|
||||||
|
}
|
||||||
|
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getAllServiceIds(ServiceType serviceType) {
|
public Set<String> getAllServiceIds(ServiceType serviceType) {
|
||||||
return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet());
|
return getAllServices(serviceType).stream().map(ServiceInfo::getServiceId).collect(Collectors.toSet());
|
||||||
@ -505,6 +520,9 @@ public class HashPartitionService implements PartitionService {
|
|||||||
}
|
}
|
||||||
responsibleServices.put(profileId, responsible);
|
responsibleServices.put(profileId, responsible);
|
||||||
}
|
}
|
||||||
|
if (responsible.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
servers = responsible;
|
servers = responsible;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user