diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 4ff630bc59..31cb159229 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -24,6 +24,7 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; @@ -34,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; @@ -74,7 +76,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM final EntityId entityId; final CalculatedFieldProcessingService cfService; final CalculatedFieldStateService cfStateService; - final int partition; TbActorCtx ctx; Map states = new HashMap<>(); @@ -85,7 +86,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM this.entityId = entityId; this.cfService = systemContext.getCalculatedFieldProcessingService(); this.cfStateService = systemContext.getCalculatedFieldStateService(); - this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId); } void init(TbActorCtx ctx) { @@ -93,8 +93,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldPartitionChangeMsg msg) { - if (!msg.getPartitions()[partition]) { - log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId); + if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) { + log.info("[{}] Stopping entity actor due to change partition event.", entityId); ctx.stop(ctx.getSelf()); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 0bd116f806..f9eec324fc 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -98,7 +98,7 @@ public class TenantActor extends RuleChainManagerActor { () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), () -> true); } catch (Exception e) { - log.info("Failed to init CF Actor.", e); + log.info("[{}] Failed to init CF Actor.", tenantId, e); } try { if (getApiUsageState().isReExecEnabled()) { @@ -259,11 +259,25 @@ public class TenantActor extends RuleChainManagerActor { ServiceType serviceType = msg.getServiceType(); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + if (cfActor == null) { + try { + //TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0; + cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId), + () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, + () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), + () -> true); + } catch (Exception e) { + log.info("[{}] Failed to init CF Actor.", tenantId, e); + } + } if (!ruleChainsInitialized) { log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId); initRuleChains(); } } else { + if (cfActor != null) { + ctx.stop(cfActor.getActorId()); + } if (ruleChainsInitialized) { log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId); destroyRuleChains(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 27bc0120c4..e9a6cb09aa 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -30,6 +30,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.configuration.Argument; @@ -51,6 +52,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; @@ -200,7 +202,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP if (broadcast) { broadcasts.add(link); } else { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, link.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, link.tenantId(), link.entityId()); unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java index 4cf62c01b3..2f5772ae50 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java @@ -18,8 +18,10 @@ package org.thingsboard.server.service.cf.cache; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; @@ -57,7 +59,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void add(TenantId tenantId, EntityId profileId, EntityId entityId) { - var tpi = partitionService.resolve(QueueKey.CF, entityId); + var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); var partition = tpi.getPartition().orElse(UNKNOWN); tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()) .add(profileId, entityId, partition, tpi.isMyPartition()); @@ -65,7 +67,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { - var tpi = partitionService.resolve(QueueKey.CF, entityId); + var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); var partition = tpi.getPartition().orElse(UNKNOWN); var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); //TODO: make this method atomic; @@ -86,7 +88,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { - var tpi = partitionService.resolve(QueueKey.CF, entityId); + var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); return tpi.getPartition().orElse(UNKNOWN); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index e81fa4d1dc..557768e9c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -20,10 +20,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; @@ -67,9 +69,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { super.init(eventConsumer); + + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME); this.stateConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(QueueKey.CF_STATES) - .topic(partitionService.getTopic(QueueKey.CF_STATES)) + .queueKey(queueKey) + .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg msg : msgs) { @@ -101,7 +105,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index e76a2be9be..b5c755ad4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -27,6 +27,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; @@ -108,9 +109,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer public void init() { super.init("tb-cf"); + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); this.eventConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(QueueKey.CF) - .topic(partitionService.getTopic(QueueKey.CF)) + .queueKey(queueKey) + .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) @@ -140,7 +142,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer // Cleanup old entities after corresponding consumers are stopped. // Any periodic tasks need to check that the entity is still managed by the current server before processing. - actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); + actorContext.tell(new CalculatedFieldPartitionChangeMsg()); } catch (Throwable t) { log.error("Failed to process partition change event: {}", event, t); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 800578ace4..c7174469b0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -358,7 +358,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback); } @@ -371,7 +371,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); toRuleEngineNfs.incrementAndGet(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 6391beecf8..d522f11f7b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -22,6 +22,7 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; @@ -108,7 +109,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { event.getNewPartitions().forEach((queueKey, partitions) -> { - if (CollectionsUtil.isOneOf(queueKey, QueueKey.CF, QueueKey.CF_STATES)) { + if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) { return; } if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java index 38a4853219..44756013ca 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java @@ -26,8 +26,6 @@ import java.util.Set; @Data public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg { - private final boolean[] partitions; - @Override public TenantId getTenantId() { return TenantId.SYS_TENANT_ID; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index b42f8cc380..eaa33e99d9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -24,6 +24,7 @@ import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -122,11 +123,6 @@ public class HashPartitionService implements PartitionService { partitionSizesMap.put(coreKey, corePartitions); partitionTopicsMap.put(coreKey, coreTopic); - partitionSizesMap.put(QueueKey.CF, cfPartitions); - partitionTopicsMap.put(QueueKey.CF, cfEventTopic); - partitionSizesMap.put(QueueKey.CF_STATES, cfPartitions); - partitionTopicsMap.put(QueueKey.CF_STATES, cfStateTopic); - QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR); partitionSizesMap.put(vcKey, vcPartitions); partitionTopicsMap.put(vcKey, vcTopic); @@ -165,6 +161,14 @@ public class HashPartitionService implements PartitionService { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionSizesMap.put(cfQueueKey, queue.getPartitions()); + partitionTopicsMap.put(cfQueueKey, cfEventTopic); + QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionSizesMap.put(cfQueueStatesKey, queue.getPartitions()); + partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic); + } partitionTopicsMap.put(queueKey, queue.getQueueTopic()); partitionSizesMap.put(queueKey, queue.getPartitions()); queueConfigs.put(queueKey, new QueueConfig(queue)); @@ -213,6 +217,14 @@ public class HashPartitionService implements PartitionService { QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg); TenantId tenantId = queueRoutingInfo.getTenantId(); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId); + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionSizesMap.put(cfQueueKey, queueRoutingInfo.getPartitions()); + partitionTopicsMap.put(cfQueueKey, cfEventTopic); + QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionSizesMap.put(cfQueueStatesKey, queueRoutingInfo.getPartitions()); + partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic); + } partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic()); partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions()); queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo)); @@ -252,6 +264,15 @@ public class HashPartitionService implements PartitionService { partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); queueConfigs.remove(queueKey); + + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionSizesMap.remove(cfQueueKey); + partitionTopicsMap.remove(cfQueueKey); + QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionSizesMap.remove(cfQueueStatesKey); + partitionTopicsMap.remove(cfQueueStatesKey); + } } @Override @@ -336,8 +357,7 @@ public class HashPartitionService implements PartitionService { } } - @Override - public TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { + private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { Integer partitionSize = partitionSizesMap.get(queueKey); if (partitionSize == null) { throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing"); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 404b0258c0..c7d5bd7acf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -37,8 +37,6 @@ public interface PartitionService { TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId); - TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId); - List resolveAll(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId); boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java index ca38959fdd..6720a9d71e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java @@ -35,9 +35,6 @@ public class QueueKey { private final String queueName; private final TenantId tenantId; - public static final QueueKey CF = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME); - public static final QueueKey CF_STATES = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_STATES_QUEUE_NAME); - public QueueKey(ServiceType type, Queue queue) { this.type = type; this.queueName = queue.getName(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java index 597463300a..f165f60be7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java @@ -64,10 +64,10 @@ public class PartitionChangeEvent extends TbApplicationEvent { } public Set getCfPartitions() { - return newPartitions.getOrDefault(QueueKey.CF, Collections.emptySet()); + return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); } - private Set getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) { + public Set getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) { return newPartitions.entrySet() .stream() .filter(entry -> serviceType.equals(entry.getKey().getType()) && queueName.equals(entry.getKey().getQueueName()))