From 9b870126f117e76db03c32c3ad7ba2fe0e6ead12 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 10 Mar 2025 15:27:59 +0200 Subject: [PATCH 1/6] added util method --- .../CalculatedFieldEntityMessageProcessor.java | 4 ++-- .../CalculatedFieldManagerMessageProcessor.java | 10 +++++----- .../service/cf/ctx/state/CalculatedFieldCtx.java | 9 --------- .../java/org/thingsboard/script/api/tbel/TbUtils.java | 8 ++++++-- .../org/thingsboard/script/api/tbel/TbUtilsTest.java | 8 +++++++- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 9ab019097b..4ff630bc59 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -153,7 +153,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException { - log.info("[{}] Processing CF telemetry msg.", msg.getEntityId()); + log.debug("[{}] Processing CF telemetry msg.", msg.getEntityId()); var proto = msg.getProto(); var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size()); MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback()); @@ -168,7 +168,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException { - log.info("[{}] Processing CF link telemetry msg.", msg.getEntityId()); + log.debug("[{}] Processing CF link telemetry msg.", msg.getEntityId()); var proto = msg.getProto(); var ctx = msg.getCtx(); var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 1418821081..b0185fd555 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -318,14 +318,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { EntityId entityId = msg.getEntityId(); - log.info("Received telemetry msg from entity [{}]", entityId); + log.debug("Received telemetry msg from entity [{}]", entityId); // 2 = 1 for CF processing + 1 for links processing MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback()); // process all cfs related to entity, or it's profile; var entityIdFields = getCalculatedFieldsByEntityId(entityId); var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId)); if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) { - log.info("Pushing telemetry msg to specific actor [{}]", entityId); + log.debug("Pushing telemetry msg to specific actor [{}]", entityId); getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback)); } else { callback.onSuccess(); @@ -342,7 +342,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) { EntityId sourceEntityId = msg.getEntityId(); - log.info("Received linked telemetry msg from entity [{}]", sourceEntityId); + log.debug("Received linked telemetry msg from entity [{}]", sourceEntityId); var proto = msg.getProto(); var linksList = proto.getLinksList(); for (var linkProto : linksList) { @@ -357,14 +357,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback()); var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback); entityIds.forEach(entityId -> { - log.info("Pushing linked telemetry msg to specific actor [{}]", entityId); + log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId); getOrCreateActor(entityId).tell(newMsg); }); } else { msg.getCallback().onSuccess(); } } else { - log.info("Pushing linked telemetry msg to specific actor [{}]", targetEntityId); + log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId); var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback()); getOrCreateActor(targetEntityId).tell(newMsg); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index e5a3d0e05e..c51aaa2e72 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -44,7 +43,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; @Data public class CalculatedFieldCtx { @@ -58,8 +56,6 @@ public class CalculatedFieldCtx { private final Map arguments; private final Map mainEntityArguments; private final Map> linkedEntityArguments; - - private final Map, String> referencedEntityKeys; private final List argNames; private Output output; private String expression; @@ -93,11 +89,6 @@ public class CalculatedFieldCtx { linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey()); } } - this.referencedEntityKeys = arguments.entrySet().stream() - .collect(Collectors.toMap( - entry -> new TbPair<>(entry.getValue().getRefEntityId() == null ? entityId : entry.getValue().getRefEntityId(), entry.getValue().getRefEntityKey()), - Map.Entry::getKey - )); this.argNames = new ArrayList<>(arguments.keySet()); this.output = configuration.getOutput(); this.expression = configuration.getExpression(); diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index ca3a8db02a..95fff48883 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -18,8 +18,6 @@ package org.thingsboard.script.api.tbel; import com.google.common.primitives.Bytes; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; -import org.mvel2.ConversionHandler; -import org.mvel2.DataConversion; import org.mvel2.ExecutionContext; import org.mvel2.ParserConfiguration; import org.mvel2.execution.ExecutionArrayList; @@ -259,6 +257,8 @@ public class TbUtils { float.class, int.class))); parserConfig.addImport("toInt", new MethodStub(TbUtils.class.getMethod("toInt", double.class))); + parserConfig.addImport("isNaN", new MethodStub(TbUtils.class.getMethod("isNaN", + double.class))); parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes", ExecutionContext.class, String.class))); parserConfig.addImport("hexToBytesArray", new MethodStub(TbUtils.class.getMethod("hexToBytesArray", @@ -1163,6 +1163,10 @@ public class TbUtils { return BigDecimal.valueOf(value).setScale(0, RoundingMode.HALF_UP).intValue(); } + public static boolean isNaN(double value) { + return Double.isNaN(value); + } + public static ExecutionHashMap toFlatMap(ExecutionContext ctx, Map json) { return toFlatMap(ctx, json, new ArrayList<>(), true); } diff --git a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java index 69987d3132..e3860ed89a 100644 --- a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java +++ b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java @@ -1138,12 +1138,18 @@ public class TbUtilsTest { } @Test - void toInt() { + public void toInt() { Assertions.assertEquals(1729, TbUtils.toInt(doubleVal)); Assertions.assertEquals(13, TbUtils.toInt(12.8)); Assertions.assertEquals(28, TbUtils.toInt(28.0)); } + @Test + public void isNaN() { + Assertions.assertFalse(TbUtils.isNaN(doubleVal)); + Assertions.assertTrue(TbUtils.isNaN(Double.NaN)); + } + private static List toList(byte[] data) { List result = new ArrayList<>(data.length); for (Byte b : data) { From 0e12e4af580b7dde555f6e9fbc641ffbe0f705e1 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 10 Mar 2025 16:29:24 +0200 Subject: [PATCH 2/6] renamed edqs service to tb-edqs --- docker/docker-compose.edqs.volumes.yml | 4 ++-- docker/docker-compose.edqs.yml | 12 ++++++------ docker/{edqs.env => tb-edqs.env} | 0 docker/{edqs => tb-edqs}/conf/logback.xml | 4 ++-- .../conf/edqs.conf => tb-edqs/conf/tb-edqs.conf} | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) rename docker/{edqs.env => tb-edqs.env} (100%) rename docker/{edqs => tb-edqs}/conf/logback.xml (91%) rename docker/{edqs/conf/edqs.conf => tb-edqs/conf/tb-edqs.conf} (88%) diff --git a/docker/docker-compose.edqs.volumes.yml b/docker/docker-compose.edqs.volumes.yml index 9d2ce946c8..89b4b3a59c 100644 --- a/docker/docker-compose.edqs.volumes.yml +++ b/docker/docker-compose.edqs.volumes.yml @@ -19,10 +19,10 @@ version: '3.0' services: tb-edqs-1: volumes: - - tb-edqs-log-volume:/var/log/edqs + - tb-edqs-log-volume:/var/log/tb-edqs tb-edqs-2: volumes: - - tb-edqs-log-volume:/var/log/edqs + - tb-edqs-log-volume:/var/log/tb-edqs volumes: tb-edqs-log-volume: diff --git a/docker/docker-compose.edqs.yml b/docker/docker-compose.edqs.yml index 67e9c987e3..6dd9606ee6 100644 --- a/docker/docker-compose.edqs.yml +++ b/docker/docker-compose.edqs.yml @@ -33,10 +33,10 @@ services: restart: always image: "${DOCKER_REPO}/${EDQS_DOCKER_NAME}:${TB_VERSION}" env_file: - - edqs.env + - tb-edqs.env volumes: - - ./edqs/conf:/usr/share/edqs/conf - - ./edqs/log:/var/log/edqs + - ./tb-edqs/conf:/usr/share/tb-edqs/conf + - ./tb-edqs/log:/var/log/tb-edqs ports: - "8080" depends_on: @@ -46,10 +46,10 @@ services: restart: always image: "${DOCKER_REPO}/${EDQS_DOCKER_NAME}:${TB_VERSION}" env_file: - - edqs.env + - tb-edqs.env volumes: - - ./edqs/conf:/usr/share/edqs/conf - - ./edqs/log:/var/log/edqs + - ./tb-edqs/conf:/usr/share/tb-edqs/conf + - ./tb-edqs/log:/var/log/tb-edqs ports: - "8080" depends_on: diff --git a/docker/edqs.env b/docker/tb-edqs.env similarity index 100% rename from docker/edqs.env rename to docker/tb-edqs.env diff --git a/docker/edqs/conf/logback.xml b/docker/tb-edqs/conf/logback.xml similarity index 91% rename from docker/edqs/conf/logback.xml rename to docker/tb-edqs/conf/logback.xml index 40481a8c35..14b5b0f04b 100644 --- a/docker/edqs/conf/logback.xml +++ b/docker/tb-edqs/conf/logback.xml @@ -21,10 +21,10 @@ - /var/log/edqs/${TB_SERVICE_ID}/tb-edqs.log + /var/log/tb-edqs/${TB_SERVICE_ID}/tb-edqs.log - /var/log/edqs/tb-edqs.%d{yyyy-MM-dd}.%i.log + /var/log/tb-edqs/tb-edqs.%d{yyyy-MM-dd}.%i.log 100MB 30 3GB diff --git a/docker/edqs/conf/edqs.conf b/docker/tb-edqs/conf/tb-edqs.conf similarity index 88% rename from docker/edqs/conf/edqs.conf rename to docker/tb-edqs/conf/tb-edqs.conf index 8c6b5d1826..a5a4e6a10c 100644 --- a/docker/edqs/conf/edqs.conf +++ b/docker/tb-edqs/conf/tb-edqs.conf @@ -14,9 +14,9 @@ # limitations under the License. # -export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=/var/log/edqs/${TB_SERVICE_ID}-gc.log:time,uptime,level,tags:filecount=10,filesize=10M" +export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=/var/log/tb-edqs/${TB_SERVICE_ID}-gc.log:time,uptime,level,tags:filecount=10,filesize=10M" export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError" export JAVA_OPTS="$JAVA_OPTS -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark" export JAVA_OPTS="$JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:+UseStringDeduplication -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10" export LOG_FILENAME=tb-edqs.out -export LOADER_PATH=/usr/share/edqs/conf +export LOADER_PATH=/usr/share/tb-edqs/conf From 2c4346b993c6dd6b9c53265e0570e61dd0646183 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 10 Mar 2025 16:46:55 +0200 Subject: [PATCH 3/6] moved init cf actor before init rule chains --- .../server/actors/tenant/TenantActor.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 4be2024680..ce8ad84682 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -91,6 +91,16 @@ public class TenantActor extends RuleChainManagerActor { isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); if (isRuleEngine) { if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + try { + //TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0; + cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId), + () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, + () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), + () -> true); + } catch (Exception e) { + log.info("Failed to init CF Actor.", e); + cantFindTenant = true; + } try { if (getApiUsageState().isReExecEnabled()) { log.debug("[{}] Going to init rule chains", tenantId); @@ -98,11 +108,6 @@ public class TenantActor extends RuleChainManagerActor { } else { log.info("[{}] Skip init of the rule chains due to API limits", tenantId); } - //TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0; - cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId), - () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, - () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), - () -> true); } catch (Exception e) { log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); cantFindTenant = true; From cfc5a1f9d5478d274b98ee1be9cdcd320bdcafb5 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 10 Mar 2025 17:08:11 +0200 Subject: [PATCH 4/6] added check for cf actor --- .../org/thingsboard/server/actors/tenant/TenantActor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index ce8ad84682..0bd116f806 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -99,7 +99,6 @@ public class TenantActor extends RuleChainManagerActor { () -> true); } catch (Exception e) { log.info("Failed to init CF Actor.", e); - cantFindTenant = true; } try { if (getApiUsageState().isReExecEnabled()) { @@ -190,6 +189,10 @@ public class TenantActor extends RuleChainManagerActor { } private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) { + if (cfActor == null) { + log.warn("[{}] CF Actor is not initialized.", tenantId); + return; + } if (priority) { cfActor.tellWithHighPriority(msg); } else { From 3d282836e7f4bd1f047fc6702a99ae501d4acb69 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 11 Mar 2025 16:46:25 +0200 Subject: [PATCH 5/6] CF Cluster mode fixes --- ...CalculatedFieldEntityMessageProcessor.java | 8 ++--- .../server/actors/tenant/TenantActor.java | 16 ++++++++- ...faultCalculatedFieldProcessingService.java | 4 ++- ...aultCalculatedFieldEntityProfileCache.java | 8 +++-- .../KafkaCalculatedFieldStateService.java | 10 ++++-- ...faultTbCalculatedFieldConsumerService.java | 8 +++-- .../queue/DefaultTbClusterService.java | 4 +-- .../DefaultTbRuleEngineConsumerService.java | 3 +- .../cf/CalculatedFieldPartitionChangeMsg.java | 2 -- .../queue/discovery/HashPartitionService.java | 34 +++++++++++++++---- .../queue/discovery/PartitionService.java | 2 -- .../server/queue/discovery/QueueKey.java | 3 -- .../discovery/event/PartitionChangeEvent.java | 4 +-- 13 files changed, 72 insertions(+), 34 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 4ff630bc59..31cb159229 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -24,6 +24,7 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; @@ -34,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; @@ -74,7 +76,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM final EntityId entityId; final CalculatedFieldProcessingService cfService; final CalculatedFieldStateService cfStateService; - final int partition; TbActorCtx ctx; Map states = new HashMap<>(); @@ -85,7 +86,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM this.entityId = entityId; this.cfService = systemContext.getCalculatedFieldProcessingService(); this.cfStateService = systemContext.getCalculatedFieldStateService(); - this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId); } void init(TbActorCtx ctx) { @@ -93,8 +93,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } public void process(CalculatedFieldPartitionChangeMsg msg) { - if (!msg.getPartitions()[partition]) { - log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId); + if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) { + log.info("[{}] Stopping entity actor due to change partition event.", entityId); ctx.stop(ctx.getSelf()); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 0bd116f806..f9eec324fc 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -98,7 +98,7 @@ public class TenantActor extends RuleChainManagerActor { () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), () -> true); } catch (Exception e) { - log.info("Failed to init CF Actor.", e); + log.info("[{}] Failed to init CF Actor.", tenantId, e); } try { if (getApiUsageState().isReExecEnabled()) { @@ -259,11 +259,25 @@ public class TenantActor extends RuleChainManagerActor { ServiceType serviceType = msg.getServiceType(); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + if (cfActor == null) { + try { + //TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0; + cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId), + () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, + () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), + () -> true); + } catch (Exception e) { + log.info("[{}] Failed to init CF Actor.", tenantId, e); + } + } if (!ruleChainsInitialized) { log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId); initRuleChains(); } } else { + if (cfActor != null) { + ctx.stop(cfActor.getActorId()); + } if (ruleChainsInitialized) { log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId); destroyRuleChains(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 27bc0120c4..e9a6cb09aa 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -30,6 +30,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.configuration.Argument; @@ -51,6 +52,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; @@ -200,7 +202,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP if (broadcast) { broadcasts.add(link); } else { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, link.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, link.tenantId(), link.entityId()); unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java index 4cf62c01b3..2f5772ae50 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java @@ -18,8 +18,10 @@ package org.thingsboard.server.service.cf.cache; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; @@ -57,7 +59,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void add(TenantId tenantId, EntityId profileId, EntityId entityId) { - var tpi = partitionService.resolve(QueueKey.CF, entityId); + var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); var partition = tpi.getPartition().orElse(UNKNOWN); tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()) .add(profileId, entityId, partition, tpi.isMyPartition()); @@ -65,7 +67,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { - var tpi = partitionService.resolve(QueueKey.CF, entityId); + var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); var partition = tpi.getPartition().orElse(UNKNOWN); var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); //TODO: make this method atomic; @@ -86,7 +88,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { - var tpi = partitionService.resolve(QueueKey.CF, entityId); + var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); return tpi.getPartition().orElse(UNKNOWN); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index e81fa4d1dc..557768e9c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -20,10 +20,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; @@ -67,9 +69,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { super.init(eventConsumer); + + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME); this.stateConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(QueueKey.CF_STATES) - .topic(partitionService.getTopic(QueueKey.CF_STATES)) + .queueKey(queueKey) + .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg msg : msgs) { @@ -101,7 +105,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta @Override protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index e76a2be9be..b5c755ad4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -27,6 +27,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; @@ -108,9 +109,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer public void init() { super.init("tb-cf"); + var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); this.eventConsumer = PartitionedQueueConsumerManager.>create() - .queueKey(QueueKey.CF) - .topic(partitionService.getTopic(QueueKey.CF)) + .queueKey(queueKey) + .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) .msgPackProcessor(this::processMsgs) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) @@ -140,7 +142,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer // Cleanup old entities after corresponding consumers are stopped. // Any periodic tasks need to check that the entity is still managed by the current server before processing. - actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); + actorContext.tell(new CalculatedFieldPartitionChangeMsg()); } catch (Throwable t) { log.error("Failed to process partition change event: {}", event, t); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 800578ace4..c7174469b0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -358,7 +358,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback); } @@ -371,7 +371,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId); producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); toRuleEngineNfs.incrementAndGet(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 6391beecf8..d522f11f7b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -22,6 +22,7 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; @@ -108,7 +109,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { event.getNewPartitions().forEach((queueKey, partitions) -> { - if (CollectionsUtil.isOneOf(queueKey, QueueKey.CF, QueueKey.CF_STATES)) { + if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) { return; } if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java index 38a4853219..44756013ca 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldPartitionChangeMsg.java @@ -26,8 +26,6 @@ import java.util.Set; @Data public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg { - private final boolean[] partitions; - @Override public TenantId getTenantId() { return TenantId.SYS_TENANT_ID; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index b42f8cc380..eaa33e99d9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -24,6 +24,7 @@ import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -122,11 +123,6 @@ public class HashPartitionService implements PartitionService { partitionSizesMap.put(coreKey, corePartitions); partitionTopicsMap.put(coreKey, coreTopic); - partitionSizesMap.put(QueueKey.CF, cfPartitions); - partitionTopicsMap.put(QueueKey.CF, cfEventTopic); - partitionSizesMap.put(QueueKey.CF_STATES, cfPartitions); - partitionTopicsMap.put(QueueKey.CF_STATES, cfStateTopic); - QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR); partitionSizesMap.put(vcKey, vcPartitions); partitionTopicsMap.put(vcKey, vcTopic); @@ -165,6 +161,14 @@ public class HashPartitionService implements PartitionService { List queueRoutingInfoList = getQueueRoutingInfos(); queueRoutingInfoList.forEach(queue -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionSizesMap.put(cfQueueKey, queue.getPartitions()); + partitionTopicsMap.put(cfQueueKey, cfEventTopic); + QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionSizesMap.put(cfQueueStatesKey, queue.getPartitions()); + partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic); + } partitionTopicsMap.put(queueKey, queue.getQueueTopic()); partitionSizesMap.put(queueKey, queue.getPartitions()); queueConfigs.put(queueKey, new QueueConfig(queue)); @@ -213,6 +217,14 @@ public class HashPartitionService implements PartitionService { QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg); TenantId tenantId = queueRoutingInfo.getTenantId(); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId); + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionSizesMap.put(cfQueueKey, queueRoutingInfo.getPartitions()); + partitionTopicsMap.put(cfQueueKey, cfEventTopic); + QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionSizesMap.put(cfQueueStatesKey, queueRoutingInfo.getPartitions()); + partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic); + } partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic()); partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions()); queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo)); @@ -252,6 +264,15 @@ public class HashPartitionService implements PartitionService { partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); queueConfigs.remove(queueKey); + + if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) { + QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME); + partitionSizesMap.remove(cfQueueKey); + partitionTopicsMap.remove(cfQueueKey); + QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME); + partitionSizesMap.remove(cfQueueStatesKey); + partitionTopicsMap.remove(cfQueueStatesKey); + } } @Override @@ -336,8 +357,7 @@ public class HashPartitionService implements PartitionService { } } - @Override - public TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { + private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { Integer partitionSize = partitionSizesMap.get(queueKey); if (partitionSize == null) { throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing"); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 404b0258c0..c7d5bd7acf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -37,8 +37,6 @@ public interface PartitionService { TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId); - TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId); - List resolveAll(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId); boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java index ca38959fdd..6720a9d71e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/QueueKey.java @@ -35,9 +35,6 @@ public class QueueKey { private final String queueName; private final TenantId tenantId; - public static final QueueKey CF = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME); - public static final QueueKey CF_STATES = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_STATES_QUEUE_NAME); - public QueueKey(ServiceType type, Queue queue) { this.type = type; this.queueName = queue.getName(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java index 597463300a..f165f60be7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/event/PartitionChangeEvent.java @@ -64,10 +64,10 @@ public class PartitionChangeEvent extends TbApplicationEvent { } public Set getCfPartitions() { - return newPartitions.getOrDefault(QueueKey.CF, Collections.emptySet()); + return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME); } - private Set getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) { + public Set getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) { return newPartitions.entrySet() .stream() .filter(entry -> serviceType.equals(entry.getKey().getType()) && queueName.equals(entry.getKey().getQueueName())) From 55f9f663126d7ff63e55b9b09e35b3ae0dc7469f Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 11 Mar 2025 16:55:58 +0200 Subject: [PATCH 6/6] CF parameters cleanup --- .../queue/DefaultTbCalculatedFieldConsumerService.java | 10 ---------- application/src/main/resources/thingsboard.yml | 5 +---- .../server/queue/discovery/HashPartitionService.java | 7 ------- .../server/queue/discovery/PartitionService.java | 2 -- 4 files changed, 1 insertion(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index b5c755ad4b..ad248cc3d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -80,8 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer private long pollInterval; @Value("${queue.calculated_fields.pack_processing_timeout:60000}") private long packProcessingTimeout; - @Value("${queue.calculated_fields.pool_size:8}") - private int poolSize; private final TbRuleEngineQueueFactory queueFactory; private final CalculatedFieldStateService stateService; @@ -148,14 +146,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer } } - private boolean[] partitionsToBooleanIndexArray(Set partitions) { - boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()]; - for (var tpi : partitions) { - tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true); - } - return myPartitions; - } - private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig config) throws Exception { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index badd779350..3cc28ad702 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1759,7 +1759,6 @@ queue: enabled: "${TB_EDQS_STATS_ENABLED:true}" # Statistics printing interval for EDQS print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}" - vc: # Default topic name topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}" @@ -1816,9 +1815,7 @@ queue: # For high-priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}" # Interval in milliseconds to poll messages by CF (Rule Engine) microservices - poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}" - # Amount of partitions used by CF microservices - partitions: "${TB_QUEUE_CF_PARTITIONS:10}" + poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:1000}" # Timeout for processing a message pack by CF microservices pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}" # Thread pool size for processing of the incoming messages diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index eaa33e99d9..345b44e764 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -68,8 +68,6 @@ public class HashPartitionService implements PartitionService { private String cfEventTopic; @Value("${queue.calculated_fields.state_topic:tb_cf_state}") private String cfStateTopic; - @Value("${queue.calculated_fields.partitions:10}") - private Integer cfPartitions; @Value("${queue.vc.topic:tb_version_control}") private String vcTopic; @Value("${queue.vc.partitions:10}") @@ -572,11 +570,6 @@ public class HashPartitionService implements PartitionService { return list == null ? 0 : list.size(); } - @Override - public int getTotalCalculatedFieldPartitions() { - return cfPartitions; - } - private Map> getServiceKeyListMap(List services) { final Map> currentMap = new HashMap<>(); services.forEach(serviceInfo -> { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index c7d5bd7acf..7abd68e25f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -79,6 +79,4 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); - int getTotalCalculatedFieldPartitions(); - }