Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2024-03-11 16:12:56 +02:00
commit 05223724a8
5 changed files with 76 additions and 52 deletions

View File

@ -113,15 +113,15 @@ public class CachedAttributesService implements AttributesService {
validate(entityId, scope); validate(entityId, scope);
Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
return cacheExecutor.submit(() -> {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey); AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey);
TbCacheValueWrapper<AttributeKvEntry> cachedAttributeValue = cache.get(attributeCacheKey); TbCacheValueWrapper<AttributeKvEntry> cachedAttributeValue = cache.get(attributeCacheKey);
if (cachedAttributeValue != null) { if (cachedAttributeValue != null) {
hitCounter.increment(); hitCounter.increment();
AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get(); AttributeKvEntry cachedAttributeKvEntry = cachedAttributeValue.get();
return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry)); return Optional.ofNullable(cachedAttributeKvEntry);
} else { } else {
missCounter.increment(); missCounter.increment();
return cacheExecutor.submit(() -> {
var cacheTransaction = cache.newTransactionForKey(attributeCacheKey); var cacheTransaction = cache.newTransactionForKey(attributeCacheKey);
try { try {
Optional<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, attributeKey); Optional<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
@ -133,8 +133,8 @@ public class CachedAttributesService implements AttributesService {
log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e); log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e);
throw e; throw e;
} }
});
} }
});
} }
@Override @Override
@ -207,7 +207,8 @@ public class CachedAttributesService implements AttributesService {
@Override @Override
public ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String scope) { public ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String scope) {
validate(entityId, scope); validate(entityId, scope);
return Futures.immediateFuture(attributesDao.findAll(tenantId, entityId, scope)); // We can`t watch on cache because the keys are unknown.
return jpaExecutorService.submit(() -> attributesDao.findAll(tenantId, entityId, scope));
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class Validator { public class Validator {
@ -46,6 +47,19 @@ public class Validator {
} }
} }
/**
* This method validate <code>EntityId</code> entity id. If entity id is invalid than throw
* <code>IncorrectParameterException</code> exception
*
* @param entityId the entityId
* @param errorMessageFunction the error message for exception that apply entityId
*/
public static void validateEntityId(EntityId entityId, Function<EntityId, String> errorMessageFunction) {
if (entityId == null || entityId.getId() == null) {
throw new IncorrectParameterException(errorMessageFunction.apply(entityId));
}
}
/** /**
* This method validate <code>String</code> string. If string is invalid than throw * This method validate <code>String</code> string. If string is invalid than throw
* <code>IncorrectParameterException</code> exception * <code>IncorrectParameterException</code> exception
@ -59,6 +73,18 @@ public class Validator {
} }
} }
/*
* This method validate <code>String</code> string. If string is invalid than throw
* <code>IncorrectParameterException</code> exception
*
* @param val the value
* @param errorMessageFunction the error message function that apply value
*/
public static void validateString(String val, Function<String, String> errorMessageFunction) {
if (val == null || val.isEmpty()) {
throw new IncorrectParameterException(errorMessageFunction.apply(val));
}
}
/** /**
* This method validate <code>long</code> value. If value isn't positive than throw * This method validate <code>long</code> value. If value isn't positive than throw

View File

@ -151,12 +151,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
@Override @Override
public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) { public ListenableFuture<Optional<TsKvEntry>> findLatestOpt(TenantId tenantId, EntityId entityId, String key) {
return Futures.immediateFuture(Optional.ofNullable(doFindLatest(entityId, key))); return service.submit(() -> Optional.ofNullable(doFindLatest(entityId, key)));
} }
@Override @Override
public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) { public ListenableFuture<TsKvEntry> findLatest(TenantId tenantId, EntityId entityId, String key) {
return Futures.immediateFuture(getLatestTsKvEntry(entityId, key)); return service.submit(() -> getLatestTsKvEntry(entityId, key));
} }
@Override @Override
@ -221,36 +221,29 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
} }
protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
TsKvEntry latest = doFindLatest(entityId, query.getKey()); ListenableFuture<TsKvEntry> latestFuture = service.submit(() -> doFindLatest(entityId, query.getKey()));
return Futures.transformAsync(latestFuture, latest -> {
if (latest == null) { if (latest == null) {
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false)); return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), false));
} }
boolean isRemoved = false;
long ts = latest.getTs(); long ts = latest.getTs();
ListenableFuture<Boolean> removedLatestFuture;
if (ts >= query.getStartTs() && ts < query.getEndTs()) { if (ts >= query.getStartTs() && ts < query.getEndTs()) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity(); TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityId(entityId.getId()); latestEntity.setEntityId(entityId.getId());
latestEntity.setKey(getOrSaveKeyId(query.getKey())); latestEntity.setKey(getOrSaveKeyId(query.getKey()));
removedLatestFuture = service.submit(() -> {
tsKvLatestRepository.delete(latestEntity); tsKvLatestRepository.delete(latestEntity);
return true; isRemoved = true;
}); if (query.getRewriteLatestIfDeleted()) {
} else {
removedLatestFuture = Futures.immediateFuture(false);
}
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
if (isRemoved && query.getRewriteLatestIfDeleted()) {
return getNewLatestEntryFuture(tenantId, entityId, query); return getNewLatestEntryFuture(tenantId, entityId, query);
} }
}
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) { protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {
return Futures.immediateFuture( return service.submit(() ->
DaoUtil.convertDataList(Lists.newArrayList( DaoUtil.convertDataList(Lists.newArrayList(
searchTsKvLatestRepository.findAllByEntityId(entityId.getId())))); searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
} }

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.dao.timeseries; package org.thingsboard.server.dao.timeseries;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -43,6 +42,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.service.Validator;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -126,18 +126,22 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override @Override
public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) { public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId); validate(entityId);
List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size()); List<ListenableFuture<TsKvEntry>> futures = new ArrayList<>(keys.size());
keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k));
keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key))); for (String key : keys) {
futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key));
}
return Futures.allAsList(futures); return Futures.allAsList(futures);
} }
@Override @Override
public List<TsKvEntry> findLatestSync(TenantId tenantId, EntityId entityId, Collection<String> keys) { public List<TsKvEntry> findLatestSync(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId); validate(entityId);
List<TsKvEntry> latestEntries = Lists.newArrayListWithExpectedSize(keys.size()); List<TsKvEntry> latestEntries = new ArrayList<>(keys.size());
keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); keys.forEach(key -> Validator.validateString(key, k -> "Incorrect key " + k));
keys.forEach(key -> latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key))); for (String key : keys) {
latestEntries.add(timeseriesLatestDao.findLatestSync(tenantId, entityId, key));
}
return latestEntries; return latestEntries;
} }
@ -165,7 +169,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override @Override
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
validate(entityId); validate(entityId);
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); List<ListenableFuture<Integer>> futures = new ArrayList<>(INSERTS_PER_ENTRY);
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L); saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L);
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
} }
@ -182,7 +186,7 @@ public class BaseTimeseriesService implements TimeseriesService {
private ListenableFuture<Integer> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest) { private ListenableFuture<Integer> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest) {
int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST; int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST;
List<ListenableFuture<Integer>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * inserts); List<ListenableFuture<Integer>> futures = new ArrayList<>(tsKvEntries.size() * inserts);
for (TsKvEntry tsKvEntry : tsKvEntries) { for (TsKvEntry tsKvEntry : tsKvEntries) {
if (saveLatest) { if (saveLatest) {
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
@ -195,7 +199,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override @Override
public ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) { public ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size()); List<ListenableFuture<Void>> futures = new ArrayList<>(tsKvEntries.size());
for (TsKvEntry tsKvEntry : tsKvEntries) { for (TsKvEntry tsKvEntry : tsKvEntries) {
futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
} }
@ -242,7 +246,7 @@ public class BaseTimeseriesService implements TimeseriesService {
public ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) { public ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
validate(entityId); validate(entityId);
deleteTsKvQueries.forEach(BaseTimeseriesService::validate); deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
List<ListenableFuture<TsKvLatestRemovingResult>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); List<ListenableFuture<TsKvLatestRemovingResult>> futures = new ArrayList<>(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) { for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery); deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery);
} }
@ -252,7 +256,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override @Override
public ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) { public ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId); validate(entityId);
List<ListenableFuture<TsKvLatestRemovingResult>> futures = Lists.newArrayListWithExpectedSize(keys.size()); List<ListenableFuture<TsKvLatestRemovingResult>> futures = new ArrayList<>(keys.size());
for (String key : keys) { for (String key : keys) {
DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false);
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
@ -281,7 +285,7 @@ public class BaseTimeseriesService implements TimeseriesService {
} }
private static void validate(EntityId entityId) { private static void validate(EntityId entityId) {
Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); Validator.validateEntityId(entityId, id -> "Incorrect entityId " + id);
} }
private void validate(ReadTsKvQuery query) { private void validate(ReadTsKvQuery query) {

View File

@ -99,10 +99,10 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
private void safePutAttributes(TbContext ctx, TbMsg msg, ObjectNode msgDataNode, T entityId) { private void safePutAttributes(TbContext ctx, TbMsg msg, ObjectNode msgDataNode, T entityId) {
Set<TbPair<String, List<String>>> failuresPairSet = ConcurrentHashMap.newKeySet(); Set<TbPair<String, List<String>>> failuresPairSet = ConcurrentHashMap.newKeySet();
var getKvEntryPairFutures = Futures.allAsList( var getKvEntryPairFutures = Futures.allAsList(
getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresPairSet),
getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresPairSet), getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresPairSet),
getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresPairSet), getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresPairSet),
getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresPairSet) getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresPairSet),
getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresPairSet)
); );
withCallback(getKvEntryPairFutures, futuresList -> { withCallback(getKvEntryPairFutures, futuresList -> {
var msgMetaData = msg.getMetaData().copy(); var msgMetaData = msg.getMetaData().copy();