Add EDQS query stats; improvements

This commit is contained in:
ViacheslavKlimov 2025-04-09 12:29:35 +03:00
parent 7b6777df45
commit 33fd8af17e
10 changed files with 224 additions and 119 deletions

View File

@ -1768,6 +1768,8 @@ queue:
stats:
# Enable/disable statistics for EDQS
enabled: "${TB_EDQS_STATS_ENABLED:true}"
# Threshold for slow queries to log, in milliseconds
slow_query_threshold: "${TB_EDQS_SLOW_QUERY_THRESHOLD_MS:3000}"
vc:
# Default topic name
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"

View File

@ -36,7 +36,7 @@ public class JavaSerDesUtil {
try (ObjectInputStream ois = new ObjectInputStream(is)) {
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
log.error("Error during deserialization message, [{}]", e.getMessage());
log.error("Error during deserialization", e);
return null;
}
}
@ -50,7 +50,7 @@ public class JavaSerDesUtil {
ois.writeObject(msq);
return boas.toByteArray();
} catch (IOException e) {
log.error("Error during serialization message, [{}]", e.getMessage());
log.error("Error during serialization", e);
throw new RuntimeException(e);
}
}

View File

@ -27,10 +27,9 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.edqs.stats.EdqsStatsService;
import org.thingsboard.server.edqs.stats.DefaultEdqsStatsService;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
@ -42,7 +41,7 @@ import java.util.function.Predicate;
public class DefaultEdqsRepository implements EdqsRepository {
private final static ConcurrentMap<TenantId, TenantRepo> repos = new ConcurrentHashMap<>();
private final Optional<EdqsStatsService> statsService;
private final DefaultEdqsStatsService statsService;
public TenantRepo get(TenantId tenantId) {
return repos.computeIfAbsent(tenantId, id -> new TenantRepo(id, statsService));

View File

@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.stats.EdqsStatsService;
import org.thingsboard.server.edqs.data.ApiUsageStateData;
import org.thingsboard.server.edqs.data.AssetData;
import org.thingsboard.server.edqs.data.CustomerData;
@ -54,7 +55,6 @@ import org.thingsboard.server.edqs.query.EdqsQuery;
import org.thingsboard.server.edqs.query.SortableEntityData;
import org.thingsboard.server.edqs.query.processor.EntityQueryProcessor;
import org.thingsboard.server.edqs.query.processor.EntityQueryProcessorFactory;
import org.thingsboard.server.edqs.stats.EdqsStatsService;
import org.thingsboard.server.edqs.util.RepositoryUtils;
import java.util.ArrayList;
@ -63,7 +63,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@ -94,9 +93,9 @@ public class TenantRepo {
private final Lock entityUpdateLock = new ReentrantLock();
private final TenantId tenantId;
private final Optional<EdqsStatsService> edqsStatsService;
private final EdqsStatsService edqsStatsService;
public TenantRepo(TenantId tenantId, Optional<EdqsStatsService> edqsStatsService) {
public TenantRepo(TenantId tenantId, EdqsStatsService edqsStatsService) {
this.tenantId = tenantId;
this.edqsStatsService = edqsStatsService;
}
@ -144,7 +143,7 @@ public class TenantRepo {
EntityData<?> to = getOrCreate(entity.getTo());
boolean added = repo.add(from, to, entity.getType());
if (added) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.RELATION, EdqsEventType.UPDATED));
edqsStatsService.reportAdded(ObjectType.RELATION);
}
} else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) {
if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) {
@ -164,7 +163,7 @@ public class TenantRepo {
if (relationsRepo != null) {
boolean removed = relationsRepo.remove(entityRelation.getFrom().getId(), entityRelation.getTo().getId(), entityRelation.getType());
if (removed) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.RELATION, EdqsEventType.DELETED));
edqsStatsService.reportRemoved(ObjectType.RELATION);
}
}
} else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) {
@ -222,7 +221,7 @@ public class TenantRepo {
if (removed.getFields() != null) {
getEntitySet(entityType).remove(removed);
}
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED));
edqsStatsService.reportRemoved(entity.type());
UUID customerId = removed.getCustomerId();
if (customerId != null) {
@ -243,7 +242,7 @@ public class TenantRepo {
Integer keyId = KeyDictionary.get(attributeKv.getKey());
boolean added = entityData.putAttr(keyId, attributeKv.getScope(), attributeKv.getDataPoint());
if (added) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.UPDATED));
edqsStatsService.reportAdded(ObjectType.ATTRIBUTE_KV);
}
}
}
@ -253,7 +252,7 @@ public class TenantRepo {
if (entityData != null) {
boolean removed = entityData.removeAttr(KeyDictionary.get(attributeKv.getKey()), attributeKv.getScope());
if (removed) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.DELETED));
edqsStatsService.reportRemoved(ObjectType.ATTRIBUTE_KV);
}
}
}
@ -264,7 +263,7 @@ public class TenantRepo {
Integer keyId = KeyDictionary.get(latestTsKv.getKey());
boolean added = entityData.putTs(keyId, latestTsKv.getDataPoint());
if (added) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.UPDATED));
edqsStatsService.reportAdded(ObjectType.LATEST_TS_KV);
}
}
}
@ -274,7 +273,7 @@ public class TenantRepo {
if (entityData != null) {
boolean removed = entityData.removeTs(KeyDictionary.get(latestTsKv.getKey()));
if (removed) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.DELETED));
edqsStatsService.reportRemoved(ObjectType.LATEST_TS_KV);
}
}
}
@ -292,7 +291,7 @@ public class TenantRepo {
return getEntityMap(entityType).computeIfAbsent(entityId, id -> {
log.debug("[{}] Adding {} {}", tenantId, entityType, id);
EntityData<?> entityData = constructEntityData(entityType, entityId);
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.UPDATED));
edqsStatsService.reportAdded(ObjectType.fromEntityType(entityType));
return entityData;
});
}

View File

@ -0,0 +1,94 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edqs.stats;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.stats.EdqsStatsService;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsTimer;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@EdqsComponent
@Service
@Slf4j
@ConditionalOnProperty(name = "queue.edqs.stats.enabled", havingValue = "true", matchIfMissing = true)
public class DefaultEdqsStatsService implements EdqsStatsService {
private final StatsFactory statsFactory;
@Value("${queue.edqs.stats.slow_query_threshold:3000}")
private int slowQueryThreshold;
private final ConcurrentHashMap<ObjectType, AtomicInteger> objectCounters = new ConcurrentHashMap<>();
private final StatsTimer dataQueryTimer;
private final StatsTimer countQueryTimer;
private DefaultEdqsStatsService(StatsFactory statsFactory) {
this.statsFactory = statsFactory;
dataQueryTimer = statsFactory.createTimer(StatsType.EDQS, "entityDataQueryTimer");
countQueryTimer = statsFactory.createTimer(StatsType.EDQS, "entityCountQueryTimer");
}
@Override
public void reportAdded(ObjectType objectType) {
getObjectCounter(objectType).incrementAndGet();
}
@Override
public void reportRemoved(ObjectType objectType) {
getObjectCounter(objectType).decrementAndGet();
}
@Override
public void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {
double timingMs = timingNanos / 1000_000.0;
if (timingMs < slowQueryThreshold) {
log.info("[{}] Executed data query in {} ms: {}", tenantId, timingMs, query);
} else {
log.warn("[{}] Executed slow data query in {} ms: {}", tenantId, timingMs, query);
}
dataQueryTimer.record(timingNanos, TimeUnit.NANOSECONDS);
}
@Override
public void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {
double timingMs = timingNanos / 1000_000.0;
if (timingMs < slowQueryThreshold) {
log.info("[{}] Executed count query in {} ms: {}", tenantId, timingMs, query);
} else {
log.warn("[{}] Executed slow count query in {} ms: {}", tenantId, timingMs, query);
}
countQueryTimer.record(timingNanos, TimeUnit.NANOSECONDS);
}
private AtomicInteger getObjectCounter(ObjectType objectType) {
return objectCounters.computeIfAbsent(objectType, type ->
statsFactory.createGauge("edqsObjectsCount", new AtomicInteger(), "objectType", type.name()));
}
}

View File

@ -1,81 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edqs.stats;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@EdqsComponent
@Service
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(name = "queue.edqs.stats.enabled", havingValue = "true", matchIfMissing = true)
public class EdqsStatsService {
private final ConcurrentHashMap<TenantId, EdqsStats> statsMap = new ConcurrentHashMap<>();
private final StatsFactory statsFactory;
public void reportEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType) {
statsMap.computeIfAbsent(tenantId, id -> new EdqsStats(tenantId, statsFactory))
.reportEvent(objectType, eventType);
}
@Getter
@AllArgsConstructor
static class EdqsStats {
private final TenantId tenantId;
private final ConcurrentHashMap<ObjectType, AtomicInteger> entityCounters = new ConcurrentHashMap<>();
private final StatsFactory statsFactory;
private AtomicInteger getOrCreateObjectCounter(ObjectType objectType) {
return entityCounters.computeIfAbsent(objectType,
type -> statsFactory.createGauge(StatsType.EDQS.getName() + "_object_count", new AtomicInteger(),
"tenantId", tenantId.toString(), "objectType", type.name()));
}
@Override
public String toString() {
return entityCounters.entrySet().stream()
.map(counters -> counters.getKey().name()+ " total = [" + counters.getValue() + "]")
.collect(Collectors.joining(", "));
}
public void reportEvent(ObjectType objectType, EdqsEventType eventType) {
AtomicInteger objectCounter = getOrCreateObjectCounter(objectType);
if (eventType == EdqsEventType.UPDATED){
objectCounter.incrementAndGet();
} else if (eventType == EdqsEventType.DELETED) {
objectCounter.decrementAndGet();
}
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.stats;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityDataQuery;
@Service
@ConditionalOnMissingBean(value = EdqsStatsService.class, ignored = DummyEdqsStatsService.class)
public class DummyEdqsStatsService implements EdqsStatsService {
@Override
public void reportAdded(ObjectType objectType) {}
@Override
public void reportRemoved(ObjectType objectType) {}
@Override
public void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {}
@Override
public void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {}
}

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.stats;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityDataQuery;
public interface EdqsStatsService {
void reportAdded(ObjectType objectType);
void reportRemoved(ObjectType objectType);
void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos);
void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos);
}

View File

@ -34,10 +34,14 @@ public class StatsTimer {
this.timer = micrometerTimer;
}
public void record(long timeMs) {
public void record(long timeMs) {
record(timeMs, TimeUnit.MILLISECONDS);
}
public void record(long timing, TimeUnit timeUnit) {
count++;
totalTime += timeMs;
timer.record(timeMs, TimeUnit.MILLISECONDS);
totalTime += timeUnit.toMillis(timing);
timer.record(timing, timeUnit);
}
public double getAvg() {
@ -47,7 +51,7 @@ public class StatsTimer {
return (double) totalTime / count;
}
public void reset() {
public void reset() {
count = 0;
totalTime = 0;
}

View File

@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.HasEmail;
@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.query.EntityTypeFilter;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
import org.thingsboard.server.common.stats.EdqsStatsService;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import java.util.ArrayList;
@ -89,6 +91,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
@Lazy
private EdqsApiService edqsApiService;
@Autowired
private EdqsStatsService edqsStatsService;
@Override
public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) {
log.trace("Executing countEntitiesByQuery, tenantId [{}], customerId [{}], query [{}]", tenantId, customerId, query);
@ -96,14 +101,19 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
validateEntityCountQuery(query);
TbStopWatch stopWatch = TbStopWatch.create();
Long result;
if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
EdqsRequest request = EdqsRequest.builder()
.entityCountQuery(query)
.build();
EdqsResponse response = processEdqsRequest(tenantId, customerId, request);
return response.getEntityCountQueryResult();
result = response.getEntityCountQueryResult();
} else {
result = entityQueryDao.countEntitiesByQuery(tenantId, customerId, query);
}
return this.entityQueryDao.countEntitiesByQuery(tenantId, customerId, query);
edqsStatsService.reportCountQuery(tenantId, query, stopWatch.stopAndGetTotalTimeNanos());
return result;
}
@Override
@ -113,27 +123,31 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
validateEntityDataQuery(query);
TbStopWatch stopWatch = TbStopWatch.create();
PageData<EntityData> result;
if (edqsApiService.isEnabled() && validForEdqs(query)) {
EdqsRequest request = EdqsRequest.builder()
.entityDataQuery(query)
.build();
EdqsResponse response = processEdqsRequest(tenantId, customerId, request);
return response.getEntityDataQueryResult();
result = response.getEntityDataQueryResult();
} else {
if (!isValidForOptimization(query)) {
result = entityQueryDao.findEntityDataByQuery(tenantId, customerId, query);
} else {
// 1 step - find entity data by filter and sort columns
PageData<EntityData> entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query);
if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) {
result = entityDataByQuery;
} else {
// 2 step - find entity data by entity ids from the 1st step
List<EntityData> entities = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData());
result = new PageData<>(entities, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext());
}
}
}
if (!isValidForOptimization(query)) {
return this.entityQueryDao.findEntityDataByQuery(tenantId, customerId, query);
}
// 1 step - find entity data by filter and sort columns
PageData<EntityData> entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query);
if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) {
return entityDataByQuery;
}
// 2 step - find entity data by entity ids from the 1st step
List<EntityData> result = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData());
return new PageData<>(result, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext());
edqsStatsService.reportDataQuery(tenantId, query, stopWatch.stopAndGetTotalTimeNanos());
return result;
}
private boolean validForEdqs(EntityCountQuery query) { // for compatibility with PE