Fix Rule Engine consumers init when tenant becomes managed

This commit is contained in:
ViacheslavKlimov 2024-02-13 13:49:14 +02:00
parent 492dc5916e
commit e806c1c86e
3 changed files with 24 additions and 11 deletions

View File

@ -193,7 +193,9 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
boolean result = ruleNode == null || !ruleNode.isSingletonMode() boolean result = ruleNode == null || !ruleNode.isSingletonMode()
|| systemContext.getDiscoveryService().isMonolith() || systemContext.getDiscoveryService().isMonolith()
|| defaultCtx.isLocalEntity(ruleNode.getId()); || defaultCtx.isLocalEntity(ruleNode.getId());
if (!result) {
log.trace("[{}][{}] Is not my node partition", tenantId, entityId); log.trace("[{}][{}] Is not my node partition", tenantId, entityId);
}
return result; return result;
} }

View File

@ -183,7 +183,7 @@ public class TenantActor extends RuleChainManagerActor {
return; return;
} }
TbMsg tbMsg = msg.getMsg(); TbMsg tbMsg = msg.getMsg();
if (getApiUsageState().isReExecEnabled()) { if (getApiUsageState().isReExecEnabled() && ruleChainsInitialized) {
if (tbMsg.getRuleChainId() == null) { if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) { if (getRootChainActor() != null) {
getRootChainActor().tell(msg); getRootChainActor().tell(msg);
@ -207,7 +207,7 @@ public class TenantActor extends RuleChainManagerActor {
} }
private void onRuleChainMsg(RuleChainAwareMsg msg) { private void onRuleChainMsg(RuleChainAwareMsg msg) {
if (getApiUsageState().isReExecEnabled()) { if (getApiUsageState().isReExecEnabled() && ruleChainsInitialized) {
getOrCreateActor(msg.getRuleChainId()).tell(msg); getOrCreateActor(msg.getRuleChainId()).tell(msg);
} }
} }
@ -233,7 +233,7 @@ public class TenantActor extends RuleChainManagerActor {
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
if (!ruleChainsInitialized) { if (!ruleChainsInitialized) {
log.info("Tenant {} is already managed by this service, initializing rule chains", tenantId); log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId);
initRuleChains(); initRuleChains();
} }
} else { } else {

View File

@ -100,7 +100,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
for (Queue configuration : queues) { for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration);
getOrCreateConsumer(queueKey).init(configuration); createConsumer(queueKey, configuration);
} }
} }
} }
@ -109,7 +109,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
event.getPartitionsMap().forEach((queueKey, partitions) -> { event.getPartitionsMap().forEach((queueKey, partitions) -> {
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {
getOrCreateConsumer(queueKey).update(partitions); var consumer = getConsumer(queueKey).orElseGet(() -> {
Queue config = queueService.findQueueByTenantIdAndName(queueKey.getTenantId(), queueKey.getQueueName());
return createConsumer(queueKey, config);
});
consumer.update(partitions);
} }
}); });
consumers.keySet().stream() consumers.keySet().stream()
@ -190,9 +194,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
Queue queue = queueService.findQueueById(tenantId, queueId); Queue queue = queueService.findQueueById(tenantId, queueId);
TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); var consumer = getConsumer(queueKey).orElseGet(() -> createConsumer(queueKey, queue));
Queue oldQueue = consumerManager.getQueue(); Queue oldQueue = consumer.getQueue();
consumerManager.update(queue); consumer.update(queue);
if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) { if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) {
partitionsChanged = true; partitionsChanged = true;
@ -235,8 +239,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
} }
private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) { private Optional<TbRuleEngineQueueConsumerManager> getConsumer(QueueKey queueKey) {
return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); return Optional.ofNullable(consumers.get(queueKey));
}
private TbRuleEngineQueueConsumerManager createConsumer(QueueKey queueKey, Queue queue) {
var consumer = new TbRuleEngineQueueConsumerManager(ctx, queueKey);
consumers.put(queueKey, consumer);
consumer.init(queue);
return consumer;
} }
private Optional<TbRuleEngineQueueConsumerManager> removeConsumer(QueueKey queueKey) { private Optional<TbRuleEngineQueueConsumerManager> removeConsumer(QueueKey queueKey) {