From 71583c5a292b33ef73700718d1386f7cf2df7bb3 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 7 Mar 2024 17:41:51 +0100 Subject: [PATCH 1/8] TbAbstractGetAttributesNode: get latest moved to the last step because SqlTimeseriesLatestDao.findLatest() effectively do blocking DB call, so getting attr async will do the job without awaiting the latest response. --- .../rule/engine/metadata/TbAbstractGetAttributesNode.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index fab1893702..7cf144ec66 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -99,10 +99,10 @@ public abstract class TbAbstractGetAttributesNode>> failuresPairSet = ConcurrentHashMap.newKeySet(); var getKvEntryPairFutures = Futures.allAsList( - getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresPairSet), getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresPairSet), getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresPairSet), - getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresPairSet) + getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresPairSet), + getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresPairSet) ); withCallback(getKvEntryPairFutures, futuresList -> { var msgMetaData = msg.getMetaData().copy(); From c8abb9ed8eefa3b2bd9df16db1f754d014d2d0f7 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 7 Mar 2024 19:01:10 +0100 Subject: [PATCH 2/8] Validator.validateString implementation with function --- .../thingsboard/server/dao/service/Validator.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java index 77f81d639f..16454e5914 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java @@ -27,6 +27,7 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException; import java.util.List; import java.util.UUID; +import java.util.function.Function; import java.util.regex.Pattern; public class Validator { @@ -59,6 +60,18 @@ public class Validator { } } + /* + * This method validate String string. If string is invalid than throw + * IncorrectParameterException exception + * + * @param val the value + * @param errorMessageFunction the error message function that apply value + */ + public static void validateString(String val, Function errorMessageFunction) { + if (val == null || val.isEmpty()) { + throw new IncorrectParameterException(errorMessageFunction.apply(val)); + } + } /** * This method validate long value. If value isn't positive than throw From 8c513bbf82b0c023acfa2789ee4fb012fe4dcc33 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 7 Mar 2024 19:03:14 +0100 Subject: [PATCH 3/8] BaseTimeseriesService reduce memory footprint at findLatest --- .../dao/timeseries/BaseTimeseriesService.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 1f77fa3c1d..8d04d1f570 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.service.Validator; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -126,18 +127,22 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> findLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); - keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); - keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key))); + List> futures = new ArrayList<>(keys.size()); + keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k)); + for (String key : keys) { + futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)); + } return Futures.allAsList(futures); } @Override public List findLatestSync(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List latestEntries = Lists.newArrayListWithExpectedSize(keys.size()); - keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); - keys.forEach(key -> latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key))); + List latestEntries = new ArrayList(keys.size()); + keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k)); + for (String key : keys) { + latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key)); + } return latestEntries; } From 45e9a9f642b9c5167738ba73437fbab533e49fb1 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 7 Mar 2024 20:26:59 +0100 Subject: [PATCH 4/8] Validator.validateEntityId with function added to reduce an overhead --- .../thingsboard/server/dao/service/Validator.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java index 16454e5914..b3a121baf6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/Validator.java @@ -47,6 +47,19 @@ public class Validator { } } + /** + * This method validate EntityId entity id. If entity id is invalid than throw + * IncorrectParameterException exception + * + * @param entityId the entityId + * @param errorMessageFunction the error message for exception that apply entityId + */ + public static void validateEntityId(EntityId entityId, Function errorMessageFunction) { + if (entityId == null || entityId.getId() == null) { + throw new IncorrectParameterException(errorMessageFunction.apply(entityId)); + } + } + /** * This method validate String string. If string is invalid than throw * IncorrectParameterException exception From c97cbbefa3199c4401d191e2803310a6efb59468 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 7 Mar 2024 20:27:29 +0100 Subject: [PATCH 5/8] BaseTimeseriesService reduce memory footprint at validateEntityId --- .../server/dao/timeseries/BaseTimeseriesService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 8d04d1f570..50c884f86e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -286,7 +286,7 @@ public class BaseTimeseriesService implements TimeseriesService { } private static void validate(EntityId entityId) { - Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); + Validator.validateEntityId(entityId, id -> "Incorrect entityId " + id); } private void validate(ReadTsKvQuery query) { From c2183be6f551021a24aab2e4742261e3b8161504 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 8 Mar 2024 10:39:56 +0100 Subject: [PATCH 6/8] BaseTimeseriesService Memory footprint reduced. replaced Lists.newArrayListWithExpectedSize with new ArrayList(size) as we are always use fixed size array and never exceed the initial size. The newArrayListWithExpectedSize implementation adds some additional space to grow beyond initial size. --- .../dao/timeseries/BaseTimeseriesService.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 50c884f86e..cfba6786f3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.timeseries; import com.google.common.base.Function; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -138,7 +137,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public List findLatestSync(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List latestEntries = new ArrayList(keys.size()); + List latestEntries = new ArrayList<>(keys.size()); keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k)); for (String key : keys) { latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key)); @@ -170,7 +169,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); + List> futures = new ArrayList<>(INSERTS_PER_ENTRY); saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L); return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); } @@ -187,7 +186,7 @@ public class BaseTimeseriesService implements TimeseriesService { private ListenableFuture doSave(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl, boolean saveLatest) { int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST; - List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * inserts); + List> futures = new ArrayList<>(tsKvEntries.size() * inserts); for (TsKvEntry tsKvEntry : tsKvEntries) { if (saveLatest) { saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); @@ -200,7 +199,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { - List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size()); + List> futures = new ArrayList<>(tsKvEntries.size()); for (TsKvEntry tsKvEntry : tsKvEntries) { futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); } @@ -247,7 +246,7 @@ public class BaseTimeseriesService implements TimeseriesService { public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { validate(entityId); deleteTsKvQueries.forEach(BaseTimeseriesService::validate); - List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); + List> futures = new ArrayList<>(deleteTsKvQueries.size() * DELETES_PER_ENTRY); for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) { deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery); } @@ -257,7 +256,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); + List> futures = new ArrayList<>(keys.size()); for (String key : keys) { DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); From 59d0f36697c0e7bad424670b4b8c97a7ce3f3652 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 8 Mar 2024 15:22:16 +0100 Subject: [PATCH 7/8] CachedAttributesService: replaced blocking calls and immediate futures with the true async --- .../attributes/CachedAttributesService.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 1666015a48..8058b0cee5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -113,15 +113,15 @@ public class CachedAttributesService implements AttributesService { validate(entityId, scope); Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); - AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); - TbCacheValueWrapper cachedAttributeValue = cache.get(attributeCacheKey); - if (cachedAttributeValue != null) { - hitCounter.increment(); - AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get(); - return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry)); - } else { - missCounter.increment(); - return cacheExecutor.submit(() -> { + return cacheExecutor.submit(() -> { + AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); + TbCacheValueWrapper cachedAttributeValue = cache.get(attributeCacheKey); + if (cachedAttributeValue != null) { + hitCounter.increment(); + AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get(); + return Optional.ofNullable(cachedAttributeKvEntry); + } else { + missCounter.increment(); var cacheTransaction = cache.newTransactionForKey(attributeCacheKey); try { Optional result = attributesDao.find(tenantId, entityId, scope, attributeKey); @@ -133,8 +133,8 @@ public class CachedAttributesService implements AttributesService { log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e); throw e; } - }); - } + } + }); } @Override @@ -207,7 +207,8 @@ public class CachedAttributesService implements AttributesService { @Override public ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String scope) { validate(entityId, scope); - return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope)); + // We can`t watch on cache because the keys are unknown. + return jpaExecutorService.submit(() -> attributesDao.findAll(tenantId, entityId, scope)); } @Override From c4b243ef10f7e5b3c6ea534f864c8f13e92e4221 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 8 Mar 2024 18:20:13 +0100 Subject: [PATCH 8/8] SqlTimeseriesLatestDao async refactoring: replaced Futures.immediateFuture() blocking call with true async using service.submit() and Futures.transformAsync() --- .../dao/sqlts/SqlTimeseriesLatestDao.java | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 112daf0637..f1a58aa417 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -151,12 +151,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme @Override public ListenableFuture> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { - return Futures.immediateFuture(Optional.ofNullable(doFindLatest(entityId, key))); + return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key))); } @Override public ListenableFuture findLatest(TenantId tenantId, EntityId entityId, String key) { - return Futures.immediateFuture(getLatestTsKvEntry(entityId, key)); + return service.submit(() -> getLatestTsKvEntry(entityId, key)); } @Override @@ -221,36 +221,29 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { - TsKvEntry latest = doFindLatest(entityId, query.getKey()); - - if (latest == null) { - return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); - } - - long ts = latest.getTs(); - ListenableFuture removedLatestFuture; - if (ts >= query.getStartTs() && ts < query.getEndTs()) { - TsKvLatestEntity latestEntity = new TsKvLatestEntity(); - latestEntity.setEntityId(entityId.getId()); - latestEntity.setKey(getOrSaveKeyId(query.getKey())); - removedLatestFuture = service.submit(() -> { + ListenableFuture latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey())); + return Futures.transformAsync(latestFuture, latest -> { + if (latest == null) { + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); + } + boolean isRemoved = false; + long ts = latest.getTs(); + if (ts >= query.getStartTs() && ts < query.getEndTs()) { + TsKvLatestEntity latestEntity = new TsKvLatestEntity(); + latestEntity.setEntityId(entityId.getId()); + latestEntity.setKey(getOrSaveKeyId(query.getKey())); tsKvLatestRepository.delete(latestEntity); - return true; - }); - } else { - removedLatestFuture = Futures.immediateFuture(false); - } - - return Futures.transformAsync(removedLatestFuture, isRemoved -> { - if (isRemoved && query.getRewriteLatestIfDeleted()) { - return getNewLatestEntryFuture(tenantId, entityId, query); + isRemoved = true; + if (query.getRewriteLatestIfDeleted()) { + return getNewLatestEntryFuture(tenantId, entityId, query); + } } return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); }, MoreExecutors.directExecutor()); } protected ListenableFuture> getFindAllLatestFuture(EntityId entityId) { - return Futures.immediateFuture( + return service.submit(() -> DaoUtil.convertDataList(Lists.newArrayList( searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); }