From a2382c206f10d84c17d0336bb9cc086ecf19d549 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 23 May 2025 16:52:03 +0300 Subject: [PATCH 1/8] fixes for performance --- .../server/actors/ActorSystemContext.java | 4 -- .../cf/DefaultCalculatedFieldCache.java | 29 ++++++++----- .../DefaultCalculatedFieldQueueService.java | 42 +++++++++++++------ .../cf/ctx/state/CalculatedFieldCtx.java | 33 +++++++++------ .../KafkaCalculatedFieldStateService.java | 19 +-------- .../queue/DefaultTbClusterService.java | 2 - .../sqlts/dictionary/JpaKeyDictionaryDao.java | 2 - .../sql/SqlLatestInsertTsRepository.java | 2 - .../thingsboard/server/dao/util/KvUtils.java | 15 +++---- 9 files changed, 79 insertions(+), 69 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 61d9586095..bd45aa404f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -318,7 +318,6 @@ public class ActorSystemContext { @Getter private TbEntityViewService tbEntityViewService; - @Lazy @Autowired @Getter private TelemetrySubscriptionService tsSubService; @@ -537,17 +536,14 @@ public class ActorSystemContext { @Getter private EntityService entityService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldProcessingService calculatedFieldProcessingService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldStateService calculatedFieldStateService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldQueueService calculatedFieldQueueService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index ed35d96cb7..383c019858 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -18,8 +18,11 @@ package org.thingsboard.server.service.cf; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.cf.CalculatedField; @@ -49,14 +52,14 @@ import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor public class DefaultCalculatedFieldCache implements CalculatedFieldCache { - private static final Integer UNKNOWN_PARTITION = -1; - - private final Lock calculatedFieldFetchLock = new ReentrantLock(); + private final ConcurrentReferenceHashMap calculatedFieldFetchLocks = new ConcurrentReferenceHashMap<>(); private final CalculatedFieldService calculatedFieldService; private final TbelInvokeService tbelInvokeService; - private final ActorSystemContext actorSystemContext; private final ApiLimitService apiLimitService; + @Autowired + @Lazy + private ActorSystemContext actorSystemContext; private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFields = new ConcurrentHashMap<>(); @@ -98,19 +101,20 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public List getCalculatedFieldsByEntityId(EntityId entityId) { - return entityIdCalculatedFields.getOrDefault(entityId, new CopyOnWriteArrayList<>()); + return entityIdCalculatedFields.getOrDefault(entityId, Collections.emptyList()); } @Override public List getCalculatedFieldLinksByEntityId(EntityId entityId) { - return entityIdCalculatedFieldLinks.getOrDefault(entityId, new CopyOnWriteArrayList<>()); + return entityIdCalculatedFieldLinks.getOrDefault(entityId, Collections.emptyList()); } @Override public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId) { CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { - calculatedFieldFetchLock.lock(); + Lock lock = getFetchLock(calculatedFieldId); + lock.lock(); try { ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { @@ -122,13 +126,17 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { } } } finally { - calculatedFieldFetchLock.unlock(); + lock.unlock(); } } log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx); return ctx; } + private Lock getFetchLock(CalculatedFieldId id) { + return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock()); + } + @Override public List getCalculatedFieldCtxsByEntityId(EntityId entityId) { if (entityId == null) { @@ -141,7 +149,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { - calculatedFieldFetchLock.lock(); + Lock lock = getFetchLock(calculatedFieldId); + lock.lock(); try { CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId); if (calculatedField == null) { @@ -163,7 +172,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { .add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId)); }); } finally { - calculatedFieldFetchLock.unlock(); + lock.unlock(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 8289e4db42..8b724a65ac 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -133,7 +133,8 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter); if (send) { - clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback)); + ToCalculatedFieldMsg calculatedFieldMsg = msg.get(); + clusterService.pushMsgToCalculatedFields(tenantId, entityId, calculatedFieldMsg, wrap(callback)); } else { if (callback != null) { callback.onSuccess(null); @@ -142,20 +143,35 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate filter, Predicate linkedEntityFilter) { - boolean send = false; - if (supportedReferencedEntities.contains(entityId.getEntityType())) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter); - if (!send) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter); - } - if (!send) { - send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream() - .map(CalculatedFieldLink::getCalculatedFieldId) - .map(calculatedFieldCache::getCalculatedFieldCtx) - .anyMatch(linkedEntityFilter); + if (!supportedReferencedEntities.contains(entityId.getEntityType())) { + return false; + } + List entityCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId); + for (CalculatedFieldCtx ctx : entityCfs) { + if (filter.test(ctx)) { + return true; } } - return send; + + EntityId profileId = getProfileId(tenantId, entityId); + if (profileId != null) { + List profileCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(profileId); + for (CalculatedFieldCtx ctx : profileCfs) { + if (filter.test(ctx)) { + return true; + } + } + } + + List links = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId); + for (CalculatedFieldLink link : links) { + CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(link.getCalculatedFieldId()); + if (ctx != null && linkedEntityFilter.test(ctx)) { + return true; + } + } + + return false; } private EntityId getProfileId(TenantId tenantId, EntityId entityId) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index a3fdae319d..6bcedd61e1 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -181,6 +181,10 @@ public class CalculatedFieldCtx { } private boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { + if (argMap.isEmpty() || values.isEmpty()) { + return false; + } + for (AttributeKvEntry attrKv : values) { ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { @@ -191,13 +195,14 @@ public class CalculatedFieldCtx { } private boolean matchesTimeSeries(Map argMap, List values) { + if (argMap.isEmpty() || values.isEmpty()) { + return false; + } + for (TsKvEntry tsKv : values) { - ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null); - if (argMap.containsKey(latestKey)) { - return true; - } - ReferencedEntityKey rollingKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_ROLLING, null); - if (argMap.containsKey(rollingKey)) { + String key = tsKv.getKey(); + if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) || + argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) { return true; } } @@ -213,6 +218,10 @@ public class CalculatedFieldCtx { } private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + if (argMap.isEmpty() || keys.isEmpty()) { + return false; + } + for (String key : keys) { ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { @@ -223,13 +232,13 @@ public class CalculatedFieldCtx { } private boolean matchesTimeSeriesKeys(Map argMap, List keys) { + if (argMap.isEmpty() || keys.isEmpty()) { + return false; + } + for (String key : keys) { - ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); - if (argMap.containsKey(latestKey)) { - return true; - } - ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null); - if (argMap.containsKey(rollingKey)) { + if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) || + argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) { return true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 533b487d38..36ae4fd334 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -30,9 +30,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; -import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; @@ -111,21 +109,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); } - stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - if (callback != null) { - callback.onSuccess(); - } - } - - @Override - public void onFailure(Throwable t) { - if (callback != null) { - callback.onFailure(t); - } - } - }); + stateProducer.send(tpi, stateId.toKey(), msg, null); + callback.onSuccess(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 561dc7122a..b5f7669fe1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -130,11 +130,9 @@ public class DefaultTbClusterService implements TbClusterService { private final AtomicInteger toEdgeNfs = new AtomicInteger(0); @Autowired - @Lazy private PartitionService partitionService; @Autowired - @Lazy private TbQueueProducerProvider producerProvider; @Autowired diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java index 53a824f026..c14c069f23 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java @@ -29,7 +29,6 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; -import org.thingsboard.server.dao.util.SqlDao; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -38,7 +37,6 @@ import java.util.concurrent.locks.ReentrantLock; @Component @Slf4j -@SqlDao @RequiredArgsConstructor public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index b40fa520f2..63afd2bd26 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -22,7 +22,6 @@ import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.AbstractVersionedInsertRepository; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; -import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; import java.sql.PreparedStatement; @@ -33,7 +32,6 @@ import java.util.List; @SqlTsLatestAnyDao @Repository @Transactional -@SqlDao public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository implements InsertLatestTsRepository { @Value("${sql.ts_latest.update_by_latest_ts:true}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java b/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java index eeb88959fa..8b95ddcb57 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java @@ -33,9 +33,8 @@ public class KvUtils { static { validatedKeys = Caffeine.newBuilder() - .weakKeys() .expireAfterAccess(24, TimeUnit.HOURS) - .maximumSize(100000).build(); + .maximumSize(50000).build(); } public static void validate(List tsKvEntries, boolean valueNoXssValidation) { @@ -57,11 +56,13 @@ public class KvUtils { throw new DataValidationException("Validation error: key length must be equal or less than 255"); } - if (validatedKeys.getIfPresent(key) == null) { - if (!NoXssValidator.isValid(key)) { - throw new DataValidationException("Validation error: key is malformed"); - } - validatedKeys.put(key, Boolean.TRUE); + Boolean isValid = validatedKeys.asMap().get(key); + if (isValid == null) { + isValid = NoXssValidator.isValid(key); + validatedKeys.put(key, isValid); + } + if (!isValid) { + throw new DataValidationException("Validation error: key is malformed"); } if (valueNoXssValidation) { From b20c33cd1d34f6478ab8e1f818b185eb4e94612f Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 4 Jun 2025 15:14:13 +0300 Subject: [PATCH 2/8] moved method --- .../service/cf/DefaultCalculatedFieldCache.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 383c019858..3c45f3e921 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.cf; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -57,9 +56,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { private final CalculatedFieldService calculatedFieldService; private final TbelInvokeService tbelInvokeService; private final ApiLimitService apiLimitService; - @Autowired @Lazy - private ActorSystemContext actorSystemContext; + private final ActorSystemContext actorSystemContext; private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFields = new ConcurrentHashMap<>(); @@ -133,10 +131,6 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { return ctx; } - private Lock getFetchLock(CalculatedFieldId id) { - return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock()); - } - @Override public List getCalculatedFieldCtxsByEntityId(EntityId entityId) { if (entityId == null) { @@ -196,4 +190,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { log.debug("[{}] evict calculated field links from cached links by entity id: {}", calculatedFieldId, oldCalculatedField); } + private Lock getFetchLock(CalculatedFieldId id) { + return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock()); + } + } From edd9a6a552d590ec129416a225edf8193c11c2cc Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 6 Jun 2025 16:38:28 +0300 Subject: [PATCH 3/8] removed unnecessary lazy --- .../server/actors/ActorSystemContext.java | 4 +- .../DefaultCalculatedFieldQueueService.java | 33 +++++++++------- .../cf/ctx/state/CalculatedFieldCtx.java | 39 ++++++++++++++----- .../AttributeKvInsertRepository.java | 4 -- .../sql/SqlLatestInsertTsRepository.java | 2 - 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index a3cd5de7bd..0125c7c07d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -81,6 +81,7 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; +import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.mobile.MobileAppBundleService; import org.thingsboard.server.dao.mobile.MobileAppService; import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; @@ -97,7 +98,6 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleNodeStateService; -import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; @@ -452,12 +452,10 @@ public class ActorSystemContext { @Getter private ApiLimitService apiLimitService; - @Lazy @Autowired(required = false) @Getter private RateLimitService rateLimitService; - @Lazy @Autowired(required = false) @Getter private DebugModeRateLimitsConfig debugModeRateLimitsConfig; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 8b724a65ac..bf206bc5dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -40,7 +40,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; @@ -86,7 +85,10 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), + var entries = request.getEntries(); + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matches(entries), + cf -> cf.linkMatches(entityId, entries), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -99,7 +101,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), + var entries = request.getEntries(); + var scope = request.getScope(); + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matches(entries, scope), + cf -> cf.linkMatches(entityId, entries, scope), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -112,7 +118,10 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result, request.getScope()), cf -> cf.linkMatchesAttrKeys(entityId, result, request.getScope()), + var scope = request.getScope(); + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matchesKeys(result, scope), + cf -> cf.linkMatchesAttrKeys(entityId, result, scope), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -120,8 +129,9 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result), cf -> cf.linkMatchesTsKeys(entityId, result), + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matchesKeys(result), + cf -> cf.linkMatchesTsKeys(entityId, result), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -183,23 +193,20 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { - ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); List entries = request.getEntries(); List versions = result != null ? result.getVersions() : Collections.emptyList(); for (int i = 0; i < entries.size(); i++) { - TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder(); + TsKvEntry tsKvEntry = entries.get(i); if (result != null) { - tsProtoBuilder.setVersion(versions.get(i)); + tsKvEntry.setVersion(versions.get(i)); } - telemetryMsg.addTsData(tsProtoBuilder.build()); + telemetryMsg.addTsData(toTsKvProto(tsKvEntry)); } - msg.setTelemetryMsg(telemetryMsg.build()); - return msg.build(); + return ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(telemetryMsg).build(); } private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 6bcedd61e1..b6c04426ba 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -185,12 +185,14 @@ public class CalculatedFieldCtx { return false; } + var lookupKey = new ReferencedEntityKey(null, ArgumentType.ATTRIBUTE, scope); for (AttributeKvEntry attrKv : values) { - ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); - if (argMap.containsKey(attrKey)) { + lookupKey.setKey(attrKv.getKey()); + if (argMap.containsKey(lookupKey)) { return true; } } + return false; } @@ -199,13 +201,21 @@ public class CalculatedFieldCtx { return false; } + var lookupKey = new ReferencedEntityKey(null, null, null); for (TsKvEntry tsKv : values) { - String key = tsKv.getKey(); - if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) || - argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) { + lookupKey.setKey(tsKv.getKey()); + + lookupKey.setType(ArgumentType.TS_LATEST); + if (argMap.containsKey(lookupKey)) { + return true; + } + + lookupKey.setType(ArgumentType.TS_ROLLING); + if (argMap.containsKey(lookupKey)) { return true; } } + return false; } @@ -222,12 +232,14 @@ public class CalculatedFieldCtx { return false; } + var lookupKey = new ReferencedEntityKey(null, ArgumentType.ATTRIBUTE, scope); for (String key : keys) { - ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); - if (argMap.containsKey(attrKey)) { + lookupKey.setKey(key); + if (argMap.containsKey(lookupKey)) { return true; } } + return false; } @@ -236,12 +248,21 @@ public class CalculatedFieldCtx { return false; } + var lookupKey = new ReferencedEntityKey(null, null, null); for (String key : keys) { - if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) || - argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) { + lookupKey.setKey(key); + + lookupKey.setType(ArgumentType.TS_LATEST); + if (argMap.containsKey(lookupKey)) { + return true; + } + + lookupKey.setType(ArgumentType.TS_ROLLING); + if (argMap.containsKey(lookupKey)) { return true; } } + return false; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java index 48a1eb6d5a..801c436d2f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java @@ -16,10 +16,8 @@ package org.thingsboard.server.dao.sql.attributes; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.AbstractVersionedInsertRepository; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; -import org.thingsboard.server.dao.util.SqlDao; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -27,8 +25,6 @@ import java.sql.Types; import java.util.List; @Repository -@Transactional -@SqlDao public class AttributeKvInsertRepository extends AbstractVersionedInsertRepository { private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') " + diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index 63afd2bd26..c9d16f00ec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.sqlts.insert.latest.sql; import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.AbstractVersionedInsertRepository; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; @@ -31,7 +30,6 @@ import java.util.List; @SqlTsLatestAnyDao @Repository -@Transactional public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository implements InsertLatestTsRepository { @Value("${sql.ts_latest.update_by_latest_ts:true}") From 0ade84c842c824300eafdf797e5ae7396e6197ec Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 11 Jun 2025 16:27:03 +0300 Subject: [PATCH 4/8] trigger processing using telemetry sub service in CF node --- .../DefaultCalculatedFieldQueueService.java | 2 +- .../ctx/state/SimpleCalculatedFieldState.java | 21 +++++++++++++++++-- .../DefaultTelemetrySubscriptionService.java | 7 ++++--- .../src/main/resources/thingsboard.yml | 1 + .../engine/api/AttributesSaveRequest.java | 1 + .../engine/api/TimeseriesSaveRequest.java | 1 + .../telemetry/TbCalculatedFieldsNode.java | 6 ++++-- 7 files changed, 31 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index bf206bc5dc..0c2d1a24a9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -200,7 +200,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS for (int i = 0; i < entries.size(); i++) { TsKvEntry tsKvEntry = entries.get(i); - if (result != null) { + if (versions != null && !versions.isEmpty() && versions.get(i) != null) { tsKvEntry.setVersion(versions.get(i)); } telemetryMsg.addTsData(toTsKvProto(tsKvEntry)); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index d0eba5031c..03399348cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -58,7 +58,18 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { for (Map.Entry entry : this.arguments.entrySet()) { try { BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue(); - expr.setVariable(entry.getKey(), Double.parseDouble(kvEntry.getValueAsString())); + try { + double value = switch (kvEntry.getDataType()) { + case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); + case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); + case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); + case STRING -> Double.parseDouble(kvEntry.getValueAsString()); + case JSON -> Double.parseDouble(kvEntry.getValueAsString()); + }; + expr.setVariable(entry.getKey(), value); + } catch (Exception e) { + throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.", e); + } } catch (NumberFormatException e) { throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number."); } @@ -85,7 +96,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { private JsonNode createResultJson(boolean preserveMsgTs, String outputName, Object result) { ObjectNode valuesNode = JacksonUtil.newObjectNode(); - valuesNode.set(outputName, JacksonUtil.valueToTree(result)); + if (result instanceof Double doubleValue) { + valuesNode.put(outputName, doubleValue); + } else if (result instanceof Integer integerValue) { + valuesNode.put(outputName, integerValue); + } else { + valuesNode.set(outputName, JacksonUtil.valueToTree(result)); + } long lastTimestamp = getLastUpdateTimestamp(); if (preserveMsgTs && lastTimestamp != -1) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 30f2885e78..0f4f615af3 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.DonAsynchron; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.DeviceStateManager; @@ -69,7 +69,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; import static java.util.Comparator.comparing; @@ -96,6 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Value("${sql.ts.value_no_xss_validation:false}") private boolean valueNoXssValidation; + @Value("${sql.ts.thread_pool_size:12}") + private int threadPoolSize; public DefaultTelemetrySubscriptionService(AttributesService attrService, TimeseriesService tsService, @@ -116,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @PostConstruct public void initExecutor() { super.initExecutor(); - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback")); + tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "ts-service-ts-callback"); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6d27b9bd7e..e170695b90 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -346,6 +346,7 @@ sql: stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability + thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}"# Thread pool size to execute dynamic queries ts_latest: batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index c3095836bd..19c1956463 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -57,6 +57,7 @@ public class AttributesSaveRequest implements CalculatedFieldSystemAwareRequest public static final Strategy PROCESS_ALL = new Strategy(true, true, true); public static final Strategy WS_ONLY = new Strategy(false, true, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false); + public static final Strategy CF_ONLY = new Strategy(false, false, true); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index c402a0c984..3bacf8d8c3 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -56,6 +56,7 @@ public class TimeseriesSaveRequest implements CalculatedFieldSystemAwareRequest public static final Strategy WS_ONLY = new Strategy(false, false, true, false); public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false, false); + public static final Strategy CF_ONLY = new Strategy(false, false, false, true); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index b2cce52c87..4e319500d1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -92,13 +92,14 @@ public class TbCalculatedFieldsNode implements TbNode { .customerId(msg.getCustomerId()) .entityId(msg.getOriginator()) .entries(tsKvEntryList) + .strategy(TimeseriesSaveRequest.Strategy.CF_ONLY) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); + ctx.getTelemetryService().saveTimeseries(timeseriesSaveRequest); } private void processPostAttributesRequest(TbContext ctx, TbMsg msg) { @@ -114,12 +115,13 @@ public class TbCalculatedFieldsNode implements TbNode { .entityId(msg.getOriginator()) .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) .entries(newAttributes) + .strategy(AttributesSaveRequest.Strategy.CF_ONLY) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); + ctx.getTelemetryService().saveAttributes(attributesSaveRequest); } } From 41117aff2d531517380dbc7cef2c634cb7fd1514 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 12 Jun 2025 12:26:30 +0300 Subject: [PATCH 5/8] removed redudant try-catch --- ...CalculatedFieldEntityMessageProcessor.java | 3 +-- ...faultCalculatedFieldProcessingService.java | 4 +--- .../ctx/state/SimpleCalculatedFieldState.java | 19 +++++++------------ .../src/main/resources/thingsboard.yml | 2 +- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 5ea79e42a8..ebc4e60709 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.DebugModeUtil; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; @@ -299,7 +298,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM callback.onSuccess(); } if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, calculationResult.getResult().toString(), null); } } } else { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index e9a6cb09aa..f2a6916751 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -25,7 +25,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; @@ -66,7 +65,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNot import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; @@ -171,7 +169,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP OutputType type = calculatedFieldResult.getType(); TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; - TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(calculatedFieldResult.getResult())).build(); + TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(calculatedFieldResult.getResult().toString()).build(); clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index c07923e470..577ff80219 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -58,18 +58,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { for (Map.Entry entry : this.arguments.entrySet()) { try { BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue(); - try { - double value = switch (kvEntry.getDataType()) { - case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); - case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); - case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); - case STRING -> Double.parseDouble(kvEntry.getValueAsString()); - case JSON -> Double.parseDouble(kvEntry.getValueAsString()); - }; - expr.setVariable(entry.getKey(), value); - } catch (Exception e) { - throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.", e); - } + double value = switch (kvEntry.getDataType()) { + case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); + case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); + case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); + case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); + }; + expr.setVariable(entry.getKey(), value); } catch (NumberFormatException e) { throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number."); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c04969bfc7..958ad2f84c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -346,7 +346,7 @@ sql: stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability - thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}"# Thread pool size to execute dynamic queries + thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor ts_latest: batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds From 29211774f590b1ce1f8ebbc56a30f67c14dfb22f Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 13 Jun 2025 17:19:54 +0300 Subject: [PATCH 6/8] Minor refactoring --- .../DefaultCalculatedFieldQueueService.java | 7 +++-- .../cf/ctx/state/CalculatedFieldCtx.java | 29 +++++++------------ 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index a423eb6669..fc5d75be56 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -41,6 +41,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; @@ -200,11 +201,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS List versions = result != null ? result.getVersions() : Collections.emptyList(); for (int i = 0; i < entries.size(); i++) { - TsKvEntry tsKvEntry = entries.get(i); + TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder(); if (versions != null && !versions.isEmpty() && versions.get(i) != null) { - tsKvEntry.setVersion(versions.get(i)); + tsProtoBuilder.setVersion(versions.get(i)); } - telemetryMsg.addTsData(toTsKvProto(tsKvEntry)); + telemetryMsg.addTsData(tsProtoBuilder.build()); } return ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(telemetryMsg).build(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 0008874779..2e3321eece 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -185,10 +185,8 @@ public class CalculatedFieldCtx { return false; } - var lookupKey = new ReferencedEntityKey(null, ArgumentType.ATTRIBUTE, scope); for (AttributeKvEntry attrKv : values) { - lookupKey.setKey(attrKv.getKey()); - if (argMap.containsKey(lookupKey)) { + if (argMap.containsKey(new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope))) { return true; } } @@ -201,17 +199,15 @@ public class CalculatedFieldCtx { return false; } - var lookupKey = new ReferencedEntityKey(null, null, null); for (TsKvEntry tsKv : values) { - lookupKey.setKey(tsKv.getKey()); - lookupKey.setType(ArgumentType.TS_LATEST); - if (argMap.containsKey(lookupKey)) { + ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null); + if (argMap.containsKey(latestKey)) { return true; } - lookupKey.setType(ArgumentType.TS_ROLLING); - if (argMap.containsKey(lookupKey)) { + ReferencedEntityKey rollingKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_ROLLING, null); + if (argMap.containsKey(rollingKey)) { return true; } } @@ -232,10 +228,9 @@ public class CalculatedFieldCtx { return false; } - var lookupKey = new ReferencedEntityKey(null, ArgumentType.ATTRIBUTE, scope); for (String key : keys) { - lookupKey.setKey(key); - if (argMap.containsKey(lookupKey)) { + ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); + if (argMap.containsKey(attrKey)) { return true; } } @@ -248,17 +243,15 @@ public class CalculatedFieldCtx { return false; } - var lookupKey = new ReferencedEntityKey(null, null, null); for (String key : keys) { - lookupKey.setKey(key); - lookupKey.setType(ArgumentType.TS_LATEST); - if (argMap.containsKey(lookupKey)) { + ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); + if (argMap.containsKey(latestKey)) { return true; } - lookupKey.setType(ArgumentType.TS_ROLLING); - if (argMap.containsKey(lookupKey)) { + ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null); + if (argMap.containsKey(rollingKey)) { return true; } } From bb570add28d7e93d0834d67b4cdf7d553a9ab9e7 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 13 Jun 2025 18:04:42 +0300 Subject: [PATCH 7/8] test fixes --- .../cf/AbstractCalculatedFieldStateService.java | 2 ++ .../ctx/state/KafkaCalculatedFieldStateService.java | 13 ++++++++++++- .../cf/ctx/state/SimpleCalculatedFieldState.java | 3 +-- .../DefaultTelemetrySubscriptionService.java | 6 +++--- application/src/main/resources/thingsboard.yml | 2 +- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index f1cb25c6fa..70b41f069e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.cf; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -39,6 +40,7 @@ import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { @Autowired + @Lazy private ActorSystemContext actorSystemContext; protected QueueStateService, TbProtoQueueMsg> stateService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 6641a06b1d..2b52892744 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -30,7 +30,9 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; @@ -109,7 +111,16 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); } - stateProducer.send(tpi, stateId.toKey(), msg, null); + stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + } + + @Override + public void onFailure(Throwable t) { + log.error("Failed to send state message: {}", stateId, t); + } + }); callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index 577ff80219..026461bd48 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -61,8 +61,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { double value = switch (kvEntry.getDataType()) { case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); - case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); - case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); + case BOOLEAN, STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); }; expr.setVariable(entry.getKey(), value); } catch (NumberFormatException e) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index a14e052659..69b41addf9 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -95,8 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Value("${sql.ts.value_no_xss_validation:false}") private boolean valueNoXssValidation; - @Value("${sql.ts.thread_pool_size:12}") - private int threadPoolSize; + @Value("${sql.ts.callback_thread_pool_size:12}") + private int callbackThreadPoolSize; public DefaultTelemetrySubscriptionService(AttributesService attrService, TimeseriesService tsService, @@ -117,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @PostConstruct public void initExecutor() { super.initExecutor(); - tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "ts-service-ts-callback"); + tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, "ts-service-ts-callback"); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 958ad2f84c..c3d28f9fce 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -346,7 +346,7 @@ sql: stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability - thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor + callback_thread_pool_size: "${SQL_TS_CALLBACK_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor ts_latest: batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds From 916dc6cb6e2caffd82a6fd25c1471ddf6cee2ed3 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 16 Jun 2025 07:39:23 +0300 Subject: [PATCH 8/8] added test --- .../ctx/state/SimpleCalculatedFieldState.java | 3 ++- .../state/SimpleCalculatedFieldStateTest.java | 20 ++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index 026461bd48..577ff80219 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -61,7 +61,8 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { double value = switch (kvEntry.getDataType()) { case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); - case BOOLEAN, STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); + case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); + case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); }; expr.setVariable(entry.getKey(), value); } catch (NumberFormatException e) { diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java index f0059e4f6f..c1616a85db 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -143,7 +144,7 @@ public class SimpleCalculatedFieldStateTest { } @Test - void testPerformCalculationWhenPassedNotNumber() { + void testPerformCalculationWhenPassedString() { state.arguments = new HashMap<>(Map.of( "key1", key1ArgEntry, "key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new StringDataEntry("key2", "string"), 124L), @@ -155,6 +156,23 @@ public class SimpleCalculatedFieldStateTest { .hasMessage("Argument 'key2' is not a number."); } + @Test + void testPerformCalculationWhenPassedBoolean() throws ExecutionException, InterruptedException { + state.arguments = new HashMap<>(Map.of( + "key1", key1ArgEntry, + "key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new BooleanDataEntry("key2", true), 124L),// true is parsed as 1 + "key3", key3ArgEntry + )); + + CalculatedFieldResult result = state.performCalculation(ctx).get(); + + assertThat(result).isNotNull(); + Output output = getCalculatedFieldConfig().getOutput(); + assertThat(result.getType()).isEqualTo(output.getType()); + assertThat(result.getScope()).isEqualTo(output.getScope()); + assertThat(result.getResult()).isEqualTo(JacksonUtil.valueToTree(Map.of("output", 35))); + } + @Test void testPerformCalculationWhenDecimalsByDefault() throws ExecutionException, InterruptedException { state.arguments = new HashMap<>(Map.of(