diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 1666015a48..8058b0cee5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -113,15 +113,15 @@ public class CachedAttributesService implements AttributesService { validate(entityId, scope); Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); - AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); - TbCacheValueWrapper cachedAttributeValue = cache.get(attributeCacheKey); - if (cachedAttributeValue != null) { - hitCounter.increment(); - AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get(); - return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry)); - } else { - missCounter.increment(); - return cacheExecutor.submit(() -> { + return cacheExecutor.submit(() -> { + AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); + TbCacheValueWrapper cachedAttributeValue = cache.get(attributeCacheKey); + if (cachedAttributeValue != null) { + hitCounter.increment(); + AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get(); + return Optional.ofNullable(cachedAttributeKvEntry); + } else { + missCounter.increment(); var cacheTransaction = cache.newTransactionForKey(attributeCacheKey); try { Optional result = attributesDao.find(tenantId, entityId, scope, attributeKey); @@ -133,8 +133,8 @@ public class CachedAttributesService implements AttributesService { log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e); throw e; } - }); - } + } + }); } @Override @@ -207,7 +207,8 @@ public class CachedAttributesService implements AttributesService { @Override public ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String scope) { validate(entityId, scope); - return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope)); + // We can`t watch on cache because the keys are unknown. + return jpaExecutorService.submit(() -> attributesDao.findAll(tenantId, entityId, scope)); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java index 77f81d639f..b3a121baf6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java @@ -27,6 +27,7 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException; import java.util.List; import java.util.UUID; +import java.util.function.Function; import java.util.regex.Pattern; public class Validator { @@ -46,6 +47,19 @@ public class Validator { } } + /** + * This method validate EntityId entity id. If entity id is invalid than throw + * IncorrectParameterException exception + * + * @param entityId the entityId + * @param errorMessageFunction the error message for exception that apply entityId + */ + public static void validateEntityId(EntityId entityId, Function errorMessageFunction) { + if (entityId == null || entityId.getId() == null) { + throw new IncorrectParameterException(errorMessageFunction.apply(entityId)); + } + } + /** * This method validate String string. If string is invalid than throw * IncorrectParameterException exception @@ -59,6 +73,18 @@ public class Validator { } } + /* + * This method validate String string. If string is invalid than throw + * IncorrectParameterException exception + * + * @param val the value + * @param errorMessageFunction the error message function that apply value + */ + public static void validateString(String val, Function errorMessageFunction) { + if (val == null || val.isEmpty()) { + throw new IncorrectParameterException(errorMessageFunction.apply(val)); + } + } /** * This method validate long value. If value isn't positive than throw diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 112daf0637..f1a58aa417 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -151,12 +151,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return Futures.immediateFuture(Optional.ofNullable(doFindLatest(entityId, key))); + return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key))); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return Futures.immediateFuture(getLatestTsKvEntry(entityId, key)); + return service.submit(() -> getLatestTsKvEntry(entityId, key)); } @Override @@ -221,36 +221,29 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - TsKvEntry latest = doFindLatest(entityId, query.getKey()); - - if (latest == null) { - return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); - } - - long ts = latest.getTs(); - ListenableFuture removedLatestFuture; - if (ts >= query.getStartTs() && ts < query.getEndTs()) { - TsKvLatestEntity latestEntity = new TsKvLatestEntity(); - latestEntity.setEntityId(entityId.getId()); - latestEntity.setKey(getOrSaveKeyId(query.getKey())); - removedLatestFuture = service.submit(() -> { + ListenableFuture latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey())); + return Futures.transformAsync(latestFuture, latest -> { + if (latest == null) { + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); + } + boolean isRemoved = false; + long ts = latest.getTs(); + if (ts >= query.getStartTs() && ts < query.getEndTs()) { + TsKvLatestEntity latestEntity = new TsKvLatestEntity(); + latestEntity.setEntityId(entityId.getId()); + latestEntity.setKey(getOrSaveKeyId(query.getKey())); tsKvLatestRepository.delete(latestEntity); - return true; - }); - } else { - removedLatestFuture = Futures.immediateFuture(false); - } - - return Futures.transformAsync(removedLatestFuture, isRemoved -> { - if (isRemoved && query.getRewriteLatestIfDeleted()) { - return getNewLatestEntryFuture(tenantId, entityId, query); + isRemoved = true; + if (query.getRewriteLatestIfDeleted()) { + return getNewLatestEntryFuture(tenantId, entityId, query); + } } return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); }, MoreExecutors.directExecutor()); } protected ListenableFuture> getFindAllLatestFuture(EntityId entityId) { - return Futures.immediateFuture( + return service.submit(() -> DaoUtil.convertDataList(Lists.newArrayList( searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 1f77fa3c1d..cfba6786f3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.timeseries; import com.google.common.base.Function; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -43,6 +42,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.service.Validator; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -126,18 +126,22 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> findLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); - keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); - keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key))); + List> futures = new ArrayList<>(keys.size()); + keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k)); + for (String key : keys) { + futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)); + } return Futures.allAsList(futures); } @Override public List findLatestSync(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List latestEntries = Lists.newArrayListWithExpectedSize(keys.size()); - keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); - keys.forEach(key -> latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key))); + List latestEntries = new ArrayList<>(keys.size()); + keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k)); + for (String key : keys) { + latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key)); + } return latestEntries; } @@ -165,7 +169,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); + List> futures = new ArrayList<>(INSERTS_PER_ENTRY); saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L); return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); } @@ -182,7 +186,7 @@ public class BaseTimeseriesService implements TimeseriesService { private ListenableFuture doSave(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl, boolean saveLatest) { int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST; - List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * inserts); + List> futures = new ArrayList<>(tsKvEntries.size() * inserts); for (TsKvEntry tsKvEntry : tsKvEntries) { if (saveLatest) { saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); @@ -195,7 +199,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { - List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size()); + List> futures = new ArrayList<>(tsKvEntries.size()); for (TsKvEntry tsKvEntry : tsKvEntries) { futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); } @@ -242,7 +246,7 @@ public class BaseTimeseriesService implements TimeseriesService { public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { validate(entityId); deleteTsKvQueries.forEach(BaseTimeseriesService::validate); - List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); + List> futures = new ArrayList<>(deleteTsKvQueries.size() * DELETES_PER_ENTRY); for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) { deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery); } @@ -252,7 +256,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); + List> futures = new ArrayList<>(keys.size()); for (String key : keys) { DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); @@ -281,7 +285,7 @@ public class BaseTimeseriesService implements TimeseriesService { } private static void validate(EntityId entityId) { - Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); + Validator.validateEntityId(entityId, id -> "Incorrect entityId " + id); } private void validate(ReadTsKvQuery query) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index fab1893702..7cf144ec66 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -99,10 +99,10 @@ public abstract class TbAbstractGetAttributesNode>> failuresPairSet = ConcurrentHashMap.newKeySet(); var getKvEntryPairFutures = Futures.allAsList( - getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresPairSet), getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresPairSet), getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresPairSet), - getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresPairSet) + getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresPairSet), + getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresPairSet) ); withCallback(getKvEntryPairFutures, futuresList -> { var msgMetaData = msg.getMetaData().copy();