diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 61d9586095..bd45aa404f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -318,7 +318,6 @@ public class ActorSystemContext { @Getter private TbEntityViewService tbEntityViewService; - @Lazy @Autowired @Getter private TelemetrySubscriptionService tsSubService; @@ -537,17 +536,14 @@ public class ActorSystemContext { @Getter private EntityService entityService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldProcessingService calculatedFieldProcessingService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldStateService calculatedFieldStateService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldQueueService calculatedFieldQueueService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index ed35d96cb7..383c019858 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -18,8 +18,11 @@ package org.thingsboard.server.service.cf; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.cf.CalculatedField; @@ -49,14 +52,14 @@ import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor public class DefaultCalculatedFieldCache implements CalculatedFieldCache { - private static final Integer UNKNOWN_PARTITION = -1; - - private final Lock calculatedFieldFetchLock = new ReentrantLock(); + private final ConcurrentReferenceHashMap calculatedFieldFetchLocks = new ConcurrentReferenceHashMap<>(); private final CalculatedFieldService calculatedFieldService; private final TbelInvokeService tbelInvokeService; - private final ActorSystemContext actorSystemContext; private final ApiLimitService apiLimitService; + @Autowired + @Lazy + private ActorSystemContext actorSystemContext; private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFields = new ConcurrentHashMap<>(); @@ -98,19 +101,20 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public List getCalculatedFieldsByEntityId(EntityId entityId) { - return entityIdCalculatedFields.getOrDefault(entityId, new CopyOnWriteArrayList<>()); + return entityIdCalculatedFields.getOrDefault(entityId, Collections.emptyList()); } @Override public List getCalculatedFieldLinksByEntityId(EntityId entityId) { - return entityIdCalculatedFieldLinks.getOrDefault(entityId, new CopyOnWriteArrayList<>()); + return entityIdCalculatedFieldLinks.getOrDefault(entityId, Collections.emptyList()); } @Override public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId) { CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { - calculatedFieldFetchLock.lock(); + Lock lock = getFetchLock(calculatedFieldId); + lock.lock(); try { ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { @@ -122,13 +126,17 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { } } } finally { - calculatedFieldFetchLock.unlock(); + lock.unlock(); } } log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx); return ctx; } + private Lock getFetchLock(CalculatedFieldId id) { + return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock()); + } + @Override public List getCalculatedFieldCtxsByEntityId(EntityId entityId) { if (entityId == null) { @@ -141,7 +149,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { - calculatedFieldFetchLock.lock(); + Lock lock = getFetchLock(calculatedFieldId); + lock.lock(); try { CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId); if (calculatedField == null) { @@ -163,7 +172,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { .add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId)); }); } finally { - calculatedFieldFetchLock.unlock(); + lock.unlock(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 8289e4db42..8b724a65ac 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -133,7 +133,8 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter); if (send) { - clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback)); + ToCalculatedFieldMsg calculatedFieldMsg = msg.get(); + clusterService.pushMsgToCalculatedFields(tenantId, entityId, calculatedFieldMsg, wrap(callback)); } else { if (callback != null) { callback.onSuccess(null); @@ -142,20 +143,35 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate filter, Predicate linkedEntityFilter) { - boolean send = false; - if (supportedReferencedEntities.contains(entityId.getEntityType())) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter); - if (!send) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter); - } - if (!send) { - send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream() - .map(CalculatedFieldLink::getCalculatedFieldId) - .map(calculatedFieldCache::getCalculatedFieldCtx) - .anyMatch(linkedEntityFilter); + if (!supportedReferencedEntities.contains(entityId.getEntityType())) { + return false; + } + List entityCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId); + for (CalculatedFieldCtx ctx : entityCfs) { + if (filter.test(ctx)) { + return true; } } - return send; + + EntityId profileId = getProfileId(tenantId, entityId); + if (profileId != null) { + List profileCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(profileId); + for (CalculatedFieldCtx ctx : profileCfs) { + if (filter.test(ctx)) { + return true; + } + } + } + + List links = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId); + for (CalculatedFieldLink link : links) { + CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(link.getCalculatedFieldId()); + if (ctx != null && linkedEntityFilter.test(ctx)) { + return true; + } + } + + return false; } private EntityId getProfileId(TenantId tenantId, EntityId entityId) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index a3fdae319d..6bcedd61e1 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -181,6 +181,10 @@ public class CalculatedFieldCtx { } private boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { + if (argMap.isEmpty() || values.isEmpty()) { + return false; + } + for (AttributeKvEntry attrKv : values) { ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { @@ -191,13 +195,14 @@ public class CalculatedFieldCtx { } private boolean matchesTimeSeries(Map argMap, List values) { + if (argMap.isEmpty() || values.isEmpty()) { + return false; + } + for (TsKvEntry tsKv : values) { - ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null); - if (argMap.containsKey(latestKey)) { - return true; - } - ReferencedEntityKey rollingKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_ROLLING, null); - if (argMap.containsKey(rollingKey)) { + String key = tsKv.getKey(); + if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) || + argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) { return true; } } @@ -213,6 +218,10 @@ public class CalculatedFieldCtx { } private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + if (argMap.isEmpty() || keys.isEmpty()) { + return false; + } + for (String key : keys) { ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { @@ -223,13 +232,13 @@ public class CalculatedFieldCtx { } private boolean matchesTimeSeriesKeys(Map argMap, List keys) { + if (argMap.isEmpty() || keys.isEmpty()) { + return false; + } + for (String key : keys) { - ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); - if (argMap.containsKey(latestKey)) { - return true; - } - ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null); - if (argMap.containsKey(rollingKey)) { + if (argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null)) || + argMap.containsKey(new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null))) { return true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 533b487d38..36ae4fd334 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -30,9 +30,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; -import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; @@ -111,21 +109,8 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); } - stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - if (callback != null) { - callback.onSuccess(); - } - } - - @Override - public void onFailure(Throwable t) { - if (callback != null) { - callback.onFailure(t); - } - } - }); + stateProducer.send(tpi, stateId.toKey(), msg, null); + callback.onSuccess(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 561dc7122a..b5f7669fe1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -130,11 +130,9 @@ public class DefaultTbClusterService implements TbClusterService { private final AtomicInteger toEdgeNfs = new AtomicInteger(0); @Autowired - @Lazy private PartitionService partitionService; @Autowired - @Lazy private TbQueueProducerProvider producerProvider; @Autowired diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java index 53a824f026..c14c069f23 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java @@ -29,7 +29,6 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; -import org.thingsboard.server.dao.util.SqlDao; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -38,7 +37,6 @@ import java.util.concurrent.locks.ReentrantLock; @Component @Slf4j -@SqlDao @RequiredArgsConstructor public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index b40fa520f2..63afd2bd26 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -22,7 +22,6 @@ import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.AbstractVersionedInsertRepository; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; -import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; import java.sql.PreparedStatement; @@ -33,7 +32,6 @@ import java.util.List; @SqlTsLatestAnyDao @Repository @Transactional -@SqlDao public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository implements InsertLatestTsRepository { @Value("${sql.ts_latest.update_by_latest_ts:true}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java b/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java index eeb88959fa..8b95ddcb57 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java @@ -33,9 +33,8 @@ public class KvUtils { static { validatedKeys = Caffeine.newBuilder() - .weakKeys() .expireAfterAccess(24, TimeUnit.HOURS) - .maximumSize(100000).build(); + .maximumSize(50000).build(); } public static void validate(List tsKvEntries, boolean valueNoXssValidation) { @@ -57,11 +56,13 @@ public class KvUtils { throw new DataValidationException("Validation error: key length must be equal or less than 255"); } - if (validatedKeys.getIfPresent(key) == null) { - if (!NoXssValidator.isValid(key)) { - throw new DataValidationException("Validation error: key is malformed"); - } - validatedKeys.put(key, Boolean.TRUE); + Boolean isValid = validatedKeys.asMap().get(key); + if (isValid == null) { + isValid = NoXssValidator.isValid(key); + validatedKeys.put(key, isValid); + } + if (!isValid) { + throw new DataValidationException("Validation error: key is malformed"); } if (valueNoXssValidation) {