Merge pull request #13788 from dashevchenko/latestTelemetryEdqsFix
EDQS: fixed displaying of tenant's telemetry saved by sysadmin
This commit is contained in:
commit
8ce3c1fa33
@ -79,6 +79,7 @@ import org.thingsboard.server.common.data.query.RelationsQueryFilter;
|
|||||||
import org.thingsboard.server.common.data.query.SingleEntityFilter;
|
import org.thingsboard.server.common.data.query.SingleEntityFilter;
|
||||||
import org.thingsboard.server.common.data.query.StringFilterPredicate;
|
import org.thingsboard.server.common.data.query.StringFilterPredicate;
|
||||||
import org.thingsboard.server.common.data.query.StringFilterPredicate.StringOperation;
|
import org.thingsboard.server.common.data.query.StringFilterPredicate.StringOperation;
|
||||||
|
import org.thingsboard.server.common.data.query.TsValue;
|
||||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
||||||
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
|
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
|
||||||
@ -118,8 +119,10 @@ import java.util.stream.Collectors;
|
|||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.thingsboard.server.common.data.AttributeScope.SERVER_SCOPE;
|
||||||
import static org.thingsboard.server.common.data.query.EntityKeyType.ATTRIBUTE;
|
import static org.thingsboard.server.common.data.query.EntityKeyType.ATTRIBUTE;
|
||||||
import static org.thingsboard.server.common.data.query.EntityKeyType.ENTITY_FIELD;
|
import static org.thingsboard.server.common.data.query.EntityKeyType.ENTITY_FIELD;
|
||||||
|
import static org.thingsboard.server.common.data.query.EntityKeyType.SERVER_ATTRIBUTE;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@DaoSqlTest
|
@DaoSqlTest
|
||||||
@ -604,7 +607,7 @@ public class EntityServiceTest extends AbstractControllerTest {
|
|||||||
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
|
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
|
||||||
for (int i = 0; i < assets.size(); i++) {
|
for (int i = 0; i < assets.size(); i++) {
|
||||||
Asset asset = assets.get(i);
|
Asset asset = assets.get(i);
|
||||||
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE));
|
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), SERVER_SCOPE));
|
||||||
}
|
}
|
||||||
Futures.allAsList(attributeFutures).get();
|
Futures.allAsList(attributeFutures).get();
|
||||||
|
|
||||||
@ -1745,6 +1748,33 @@ public class EntityServiceTest extends AbstractControllerTest {
|
|||||||
deviceService.deleteDevicesByTenantId(tenantId);
|
deviceService.deleteDevicesByTenantId(tenantId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFindTenantTelemetry() {
|
||||||
|
// save timeseries by sys admin
|
||||||
|
BasicTsKvEntry timeseries = new BasicTsKvEntry(42L, new DoubleDataEntry("temperature", 45.5));
|
||||||
|
timeseriesService.save(TenantId.SYS_TENANT_ID, tenantId, timeseries);
|
||||||
|
|
||||||
|
AttributeKvEntry attr = new BaseAttributeKvEntry(new LongDataEntry("attr", 10L), 42L);
|
||||||
|
attributesService.save(TenantId.SYS_TENANT_ID, tenantId, SERVER_SCOPE, List.of(attr));
|
||||||
|
|
||||||
|
SingleEntityFilter singleEntityFilter = new SingleEntityFilter();
|
||||||
|
singleEntityFilter.setSingleEntity(AliasEntityId.fromEntityId(tenantId));
|
||||||
|
|
||||||
|
List<EntityKey> entityFields = List.of(
|
||||||
|
new EntityKey(ENTITY_FIELD, "name")
|
||||||
|
);
|
||||||
|
List<EntityKey> latestValues = List.of(
|
||||||
|
new EntityKey(EntityKeyType.TIME_SERIES, "temperature"),
|
||||||
|
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "attr")
|
||||||
|
);
|
||||||
|
|
||||||
|
EntityDataPageLink pageLink = new EntityDataPageLink(1000, 0, null, null);
|
||||||
|
EntityDataQuery query = new EntityDataQuery(singleEntityFilter, pageLink, entityFields, latestValues, null);
|
||||||
|
|
||||||
|
findByQueryAndCheckTelemetry(query, EntityKeyType.TIME_SERIES, "temperature", List.of("45.5"));
|
||||||
|
findByQueryAndCheckTelemetry(query, EntityKeyType.SERVER_ATTRIBUTE, "attr", List.of("10"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuildStringPredicateQueryOperations() throws ExecutionException, InterruptedException {
|
public void testBuildStringPredicateQueryOperations() throws ExecutionException, InterruptedException {
|
||||||
|
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|||||||
import org.springframework.context.annotation.Primary;
|
import org.springframework.context.annotation.Primary;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.ObjectType;
|
import org.thingsboard.server.common.data.ObjectType;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.edqs.AttributeKv;
|
import org.thingsboard.server.common.data.edqs.AttributeKv;
|
||||||
@ -118,7 +119,8 @@ public class BaseAttributesService implements AttributesService {
|
|||||||
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
|
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
|
||||||
for (AttributeKvEntry attribute : attributes) {
|
for (AttributeKvEntry attribute : attributes) {
|
||||||
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
|
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
|
||||||
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
|
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
|
||||||
|
edqsService.onUpdate(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
|
||||||
return version;
|
return version;
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
futures.add(future);
|
futures.add(future);
|
||||||
@ -136,7 +138,8 @@ public class BaseAttributesService implements AttributesService {
|
|||||||
String key = keyVersionPair.getFirst();
|
String key = keyVersionPair.getFirst();
|
||||||
Long version = keyVersionPair.getSecond();
|
Long version = keyVersionPair.getSecond();
|
||||||
if (version != null) {
|
if (version != null) {
|
||||||
edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
|
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
|
||||||
|
edqsService.onDelete(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
|
||||||
}
|
}
|
||||||
keys.add(key);
|
keys.add(key);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,6 +30,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
import org.thingsboard.server.cache.TbCacheValueWrapper;
|
||||||
import org.thingsboard.server.cache.VersionedTbCache;
|
import org.thingsboard.server.cache.VersionedTbCache;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.ObjectType;
|
import org.thingsboard.server.common.data.ObjectType;
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.edqs.AttributeKv;
|
import org.thingsboard.server.common.data.edqs.AttributeKv;
|
||||||
@ -239,7 +240,8 @@ public class CachedAttributesService implements AttributesService {
|
|||||||
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
|
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
|
||||||
BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version);
|
BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version);
|
||||||
put(entityId, scope, attributeKvEntry);
|
put(entityId, scope, attributeKvEntry);
|
||||||
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
|
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
|
||||||
|
edqsService.onUpdate(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
|
||||||
return version;
|
return version;
|
||||||
}, cacheExecutor);
|
}, cacheExecutor);
|
||||||
futures.add(future);
|
futures.add(future);
|
||||||
@ -263,7 +265,8 @@ public class CachedAttributesService implements AttributesService {
|
|||||||
Long version = keyVersionPair.getSecond();
|
Long version = keyVersionPair.getSecond();
|
||||||
cache.evict(new AttributeCacheKey(scope, entityId, key), version);
|
cache.evict(new AttributeCacheKey(scope, entityId, key), version);
|
||||||
if (version != null) {
|
if (version != null) {
|
||||||
edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
|
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
|
||||||
|
edqsService.onDelete(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
|
||||||
}
|
}
|
||||||
return key;
|
return key;
|
||||||
}, cacheExecutor)).toList());
|
}, cacheExecutor)).toList());
|
||||||
|
|||||||
@ -196,7 +196,8 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
if (saveLatest) {
|
if (saveLatest) {
|
||||||
latestFutures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), version -> {
|
latestFutures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), version -> {
|
||||||
if (version != null) {
|
if (version != null) {
|
||||||
edqsService.onUpdate(tenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version));
|
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
|
||||||
|
edqsService.onUpdate(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version));
|
||||||
}
|
}
|
||||||
return version;
|
return version;
|
||||||
}, MoreExecutors.directExecutor()));
|
}, MoreExecutors.directExecutor()));
|
||||||
@ -276,7 +277,8 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
return Futures.transform(timeseriesLatestDao.removeLatest(tenantId, entityId, query), result -> {
|
return Futures.transform(timeseriesLatestDao.removeLatest(tenantId, entityId, query), result -> {
|
||||||
if (result.isRemoved()) {
|
if (result.isRemoved()) {
|
||||||
Long version = result.getVersion();
|
Long version = result.getVersion();
|
||||||
edqsService.onDelete(tenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, query.getKey(), version));
|
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
|
||||||
|
edqsService.onDelete(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, query.getKey(), version));
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user