Merge pull request #10394 from thingsboard/fix/queue-deletion-event
Ignore partition change event for deleted queues
This commit is contained in:
commit
ac430e9a80
@ -114,10 +114,18 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {
|
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {
|
||||||
var consumer = getConsumer(queueKey).orElseGet(() -> {
|
var consumer = getConsumer(queueKey).orElseGet(() -> {
|
||||||
Queue config = queueService.findQueueByTenantIdAndName(queueKey.getTenantId(), queueKey.getQueueName());
|
Queue config = queueService.findQueueByTenantIdAndName(queueKey.getTenantId(), queueKey.getQueueName());
|
||||||
|
if (config == null) {
|
||||||
|
if (!partitions.isEmpty()) {
|
||||||
|
log.error("[{}] Queue configuration is missing", queueKey, new RuntimeException("stacktrace"));
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
return createConsumer(queueKey, config);
|
return createConsumer(queueKey, config);
|
||||||
});
|
});
|
||||||
|
if (consumer != null) {
|
||||||
consumer.update(partitions);
|
consumer.update(partitions);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
consumers.keySet().stream()
|
consumers.keySet().stream()
|
||||||
.collect(Collectors.groupingBy(QueueKey::getTenantId))
|
.collect(Collectors.groupingBy(QueueKey::getTenantId))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user