Always recalculate partitions after queue updates
This commit is contained in:
parent
4319024eed
commit
12af0cd757
@ -184,7 +184,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void updateQueues(List<QueueUpdateMsg> queueUpdateMsgs) {
|
private void updateQueues(List<QueueUpdateMsg> queueUpdateMsgs) {
|
||||||
boolean partitionsChanged = false;
|
|
||||||
for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) {
|
for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) {
|
||||||
log.info("Received queue update msg: [{}]", queueUpdateMsg);
|
log.info("Received queue update msg: [{}]", queueUpdateMsg);
|
||||||
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
||||||
@ -194,23 +193,14 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
|
||||||
Queue queue = queueService.findQueueById(tenantId, queueId);
|
Queue queue = queueService.findQueueById(tenantId, queueId);
|
||||||
|
|
||||||
var consumer = getConsumer(queueKey).orElseGet(() -> createConsumer(queueKey, queue));
|
getConsumer(queueKey).ifPresentOrElse(consumer -> consumer.update(queue),
|
||||||
Queue oldQueue = consumer.getQueue();
|
() -> createConsumer(queueKey, queue));
|
||||||
consumer.update(queue);
|
|
||||||
|
|
||||||
if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) {
|
|
||||||
partitionsChanged = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
partitionsChanged = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionsChanged) {
|
partitionService.updateQueues(queueUpdateMsgs);
|
||||||
partitionService.updateQueues(queueUpdateMsgs);
|
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(),
|
||||||
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(),
|
new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
||||||
new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteQueues(List<QueueDeleteMsg> queueDeleteMsgs) {
|
private void deleteQueues(List<QueueDeleteMsg> queueDeleteMsgs) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user