Merge pull request #13141 from thingsboard/edqs-improvements
EDQS improvements
This commit is contained in:
		
						commit
						c86f4df0f5
					
				@ -1768,6 +1768,8 @@ queue:
 | 
			
		||||
    stats:
 | 
			
		||||
      # Enable/disable statistics for EDQS
 | 
			
		||||
      enabled: "${TB_EDQS_STATS_ENABLED:true}"
 | 
			
		||||
      # Threshold for slow queries to log, in milliseconds
 | 
			
		||||
      slow_query_threshold: "${TB_EDQS_SLOW_QUERY_THRESHOLD_MS:3000}"
 | 
			
		||||
  vc:
 | 
			
		||||
    # Default topic name
 | 
			
		||||
    topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
 | 
			
		||||
 | 
			
		||||
@ -36,7 +36,7 @@ public class JavaSerDesUtil {
 | 
			
		||||
        try (ObjectInputStream ois = new ObjectInputStream(is)) {
 | 
			
		||||
            return (T) ois.readObject();
 | 
			
		||||
        } catch (IOException | ClassNotFoundException e) {
 | 
			
		||||
            log.error("Error during deserialization message, [{}]", e.getMessage());
 | 
			
		||||
            log.error("Error during deserialization", e);
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -50,7 +50,7 @@ public class JavaSerDesUtil {
 | 
			
		||||
            ois.writeObject(msq);
 | 
			
		||||
            return boas.toByteArray();
 | 
			
		||||
        } catch (IOException e) {
 | 
			
		||||
            log.error("Error during serialization message, [{}]", e.getMessage());
 | 
			
		||||
            log.error("Error during serialization", e);
 | 
			
		||||
            throw new RuntimeException(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -289,7 +289,7 @@ public class FieldsUtil {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static String getText(JsonNode node) {
 | 
			
		||||
        return node != null ? node.toString() : "";
 | 
			
		||||
        return node != null && !node.isNull() ? node.toString() : "";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static UUID getCustomerId(CustomerId customerId) {
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,7 @@ package org.thingsboard.server.edqs.data.dp;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DataType;
 | 
			
		||||
import org.thingsboard.server.edqs.util.TbBytePool;
 | 
			
		||||
import org.thingsboard.common.util.TbBytePool;
 | 
			
		||||
import org.xerial.snappy.Snappy;
 | 
			
		||||
 | 
			
		||||
public class CompressedStringDataPoint extends AbstractDataPoint {
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,7 @@ package org.thingsboard.server.edqs.data.dp;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DataType;
 | 
			
		||||
import org.thingsboard.server.edqs.util.TbStringPool;
 | 
			
		||||
import org.thingsboard.common.util.TbStringPool;
 | 
			
		||||
 | 
			
		||||
public class JsonDataPoint extends AbstractDataPoint {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,7 @@ package org.thingsboard.server.edqs.data.dp;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.DataType;
 | 
			
		||||
import org.thingsboard.server.edqs.util.TbStringPool;
 | 
			
		||||
import org.thingsboard.common.util.TbStringPool;
 | 
			
		||||
 | 
			
		||||
public class StringDataPoint extends AbstractDataPoint {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -27,10 +27,9 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
 | 
			
		||||
import org.thingsboard.server.edqs.stats.EdqsStatsService;
 | 
			
		||||
import org.thingsboard.server.edqs.stats.DefaultEdqsStatsService;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsComponent;
 | 
			
		||||
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
import java.util.function.Predicate;
 | 
			
		||||
@ -42,7 +41,7 @@ import java.util.function.Predicate;
 | 
			
		||||
public class DefaultEdqsRepository implements EdqsRepository {
 | 
			
		||||
 | 
			
		||||
    private final static ConcurrentMap<TenantId, TenantRepo> repos = new ConcurrentHashMap<>();
 | 
			
		||||
    private final Optional<EdqsStatsService> statsService;
 | 
			
		||||
    private final DefaultEdqsStatsService statsService;
 | 
			
		||||
 | 
			
		||||
    public TenantRepo get(TenantId tenantId) {
 | 
			
		||||
        return repos.computeIfAbsent(tenantId, id -> new TenantRepo(id, statsService));
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,6 @@ import org.thingsboard.server.common.data.edqs.EdqsEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.Entity;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.LatestTsKv;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.fields.AssetFields;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.fields.EntityFields;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.query.QueryResult;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
@ -41,6 +40,7 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
 | 
			
		||||
import org.thingsboard.server.common.data.query.TsValue;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
import org.thingsboard.server.common.stats.EdqsStatsService;
 | 
			
		||||
import org.thingsboard.server.edqs.data.ApiUsageStateData;
 | 
			
		||||
import org.thingsboard.server.edqs.data.AssetData;
 | 
			
		||||
import org.thingsboard.server.edqs.data.CustomerData;
 | 
			
		||||
@ -55,9 +55,7 @@ import org.thingsboard.server.edqs.query.EdqsQuery;
 | 
			
		||||
import org.thingsboard.server.edqs.query.SortableEntityData;
 | 
			
		||||
import org.thingsboard.server.edqs.query.processor.EntityQueryProcessor;
 | 
			
		||||
import org.thingsboard.server.edqs.query.processor.EntityQueryProcessorFactory;
 | 
			
		||||
import org.thingsboard.server.edqs.stats.EdqsStatsService;
 | 
			
		||||
import org.thingsboard.server.edqs.util.RepositoryUtils;
 | 
			
		||||
import org.thingsboard.server.edqs.util.TbStringPool;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
@ -65,7 +63,6 @@ import java.util.Comparator;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.TreeSet;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -96,9 +93,9 @@ public class TenantRepo {
 | 
			
		||||
    private final Lock entityUpdateLock = new ReentrantLock();
 | 
			
		||||
 | 
			
		||||
    private final TenantId tenantId;
 | 
			
		||||
    private final Optional<EdqsStatsService> edqsStatsService;
 | 
			
		||||
    private final EdqsStatsService edqsStatsService;
 | 
			
		||||
 | 
			
		||||
    public TenantRepo(TenantId tenantId, Optional<EdqsStatsService> edqsStatsService) {
 | 
			
		||||
    public TenantRepo(TenantId tenantId, EdqsStatsService edqsStatsService) {
 | 
			
		||||
        this.tenantId = tenantId;
 | 
			
		||||
        this.edqsStatsService = edqsStatsService;
 | 
			
		||||
    }
 | 
			
		||||
@ -144,9 +141,9 @@ public class TenantRepo {
 | 
			
		||||
                RelationsRepo repo = relations.computeIfAbsent(entity.getTypeGroup(), tg -> new RelationsRepo());
 | 
			
		||||
                EntityData<?> from = getOrCreate(entity.getFrom());
 | 
			
		||||
                EntityData<?> to = getOrCreate(entity.getTo());
 | 
			
		||||
                boolean added = repo.add(from, to, TbStringPool.intern(entity.getType()));
 | 
			
		||||
                boolean added = repo.add(from, to, entity.getType());
 | 
			
		||||
                if (added) {
 | 
			
		||||
                    edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.RELATION, EdqsEventType.UPDATED));
 | 
			
		||||
                    edqsStatsService.reportAdded(ObjectType.RELATION);
 | 
			
		||||
                }
 | 
			
		||||
            } else if (RelationTypeGroup.DASHBOARD.equals(entity.getTypeGroup())) {
 | 
			
		||||
                if (EntityRelation.CONTAINS_TYPE.equals(entity.getType()) && entity.getFrom().getEntityType() == EntityType.CUSTOMER) {
 | 
			
		||||
@ -166,7 +163,7 @@ public class TenantRepo {
 | 
			
		||||
            if (relationsRepo != null) {
 | 
			
		||||
                boolean removed = relationsRepo.remove(entityRelation.getFrom().getId(), entityRelation.getTo().getId(), entityRelation.getType());
 | 
			
		||||
                if (removed) {
 | 
			
		||||
                    edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.RELATION, EdqsEventType.DELETED));
 | 
			
		||||
                    edqsStatsService.reportRemoved(ObjectType.RELATION);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else if (RelationTypeGroup.DASHBOARD.equals(entityRelation.getTypeGroup())) {
 | 
			
		||||
@ -188,7 +185,6 @@ public class TenantRepo {
 | 
			
		||||
            EntityType entityType = entity.getType();
 | 
			
		||||
 | 
			
		||||
            EntityData entityData = getOrCreate(entityType, entityId);
 | 
			
		||||
            processFields(fields);
 | 
			
		||||
            EntityFields oldFields = entityData.getFields();
 | 
			
		||||
            entityData.setFields(fields);
 | 
			
		||||
            if (oldFields == null) {
 | 
			
		||||
@ -225,7 +221,7 @@ public class TenantRepo {
 | 
			
		||||
                if (removed.getFields() != null) {
 | 
			
		||||
                    getEntitySet(entityType).remove(removed);
 | 
			
		||||
                }
 | 
			
		||||
                edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.DELETED));
 | 
			
		||||
                edqsStatsService.reportRemoved(entity.type());
 | 
			
		||||
 | 
			
		||||
                UUID customerId = removed.getCustomerId();
 | 
			
		||||
                if (customerId != null) {
 | 
			
		||||
@ -246,7 +242,7 @@ public class TenantRepo {
 | 
			
		||||
            Integer keyId = KeyDictionary.get(attributeKv.getKey());
 | 
			
		||||
            boolean added = entityData.putAttr(keyId, attributeKv.getScope(), attributeKv.getDataPoint());
 | 
			
		||||
            if (added) {
 | 
			
		||||
                edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.UPDATED));
 | 
			
		||||
                edqsStatsService.reportAdded(ObjectType.ATTRIBUTE_KV);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -256,7 +252,7 @@ public class TenantRepo {
 | 
			
		||||
        if (entityData != null) {
 | 
			
		||||
            boolean removed = entityData.removeAttr(KeyDictionary.get(attributeKv.getKey()), attributeKv.getScope());
 | 
			
		||||
            if (removed) {
 | 
			
		||||
                edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.DELETED));
 | 
			
		||||
                edqsStatsService.reportRemoved(ObjectType.ATTRIBUTE_KV);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -267,7 +263,7 @@ public class TenantRepo {
 | 
			
		||||
            Integer keyId = KeyDictionary.get(latestTsKv.getKey());
 | 
			
		||||
            boolean added = entityData.putTs(keyId, latestTsKv.getDataPoint());
 | 
			
		||||
            if (added) {
 | 
			
		||||
                edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.UPDATED));
 | 
			
		||||
                edqsStatsService.reportAdded(ObjectType.LATEST_TS_KV);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -277,17 +273,11 @@ public class TenantRepo {
 | 
			
		||||
        if (entityData != null) {
 | 
			
		||||
            boolean removed = entityData.removeTs(KeyDictionary.get(latestTsKv.getKey()));
 | 
			
		||||
            if (removed) {
 | 
			
		||||
                edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.DELETED));
 | 
			
		||||
                edqsStatsService.reportRemoved(ObjectType.LATEST_TS_KV);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void processFields(EntityFields fields) {
 | 
			
		||||
        if (fields instanceof AssetFields assetFields) {
 | 
			
		||||
            assetFields.setType(TbStringPool.intern(assetFields.getType()));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public ConcurrentMap<UUID, EntityData<?>> getEntityMap(EntityType entityType) {
 | 
			
		||||
        return entityMapByType.computeIfAbsent(entityType, et -> new ConcurrentHashMap<>());
 | 
			
		||||
    }
 | 
			
		||||
@ -301,7 +291,7 @@ public class TenantRepo {
 | 
			
		||||
        return getEntityMap(entityType).computeIfAbsent(entityId, id -> {
 | 
			
		||||
            log.debug("[{}] Adding {} {}", tenantId, entityType, id);
 | 
			
		||||
            EntityData<?> entityData = constructEntityData(entityType, entityId);
 | 
			
		||||
            edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.fromEntityType(entityType), EdqsEventType.UPDATED));
 | 
			
		||||
            edqsStatsService.reportAdded(ObjectType.fromEntityType(entityType));
 | 
			
		||||
            return entityData;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,94 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edqs.stats;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
 | 
			
		||||
import org.thingsboard.server.common.stats.EdqsStatsService;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsTimer;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsType;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsComponent;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@EdqsComponent
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@ConditionalOnProperty(name = "queue.edqs.stats.enabled", havingValue = "true", matchIfMissing = true)
 | 
			
		||||
public class DefaultEdqsStatsService implements EdqsStatsService {
 | 
			
		||||
 | 
			
		||||
    private final StatsFactory statsFactory;
 | 
			
		||||
 | 
			
		||||
    @Value("${queue.edqs.stats.slow_query_threshold:3000}")
 | 
			
		||||
    private int slowQueryThreshold;
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentHashMap<ObjectType, AtomicInteger> objectCounters = new ConcurrentHashMap<>();
 | 
			
		||||
    private final StatsTimer dataQueryTimer;
 | 
			
		||||
    private final StatsTimer countQueryTimer;
 | 
			
		||||
 | 
			
		||||
    private DefaultEdqsStatsService(StatsFactory statsFactory) {
 | 
			
		||||
        this.statsFactory = statsFactory;
 | 
			
		||||
        dataQueryTimer = statsFactory.createTimer(StatsType.EDQS, "entityDataQueryTimer");
 | 
			
		||||
        countQueryTimer = statsFactory.createTimer(StatsType.EDQS, "entityCountQueryTimer");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportAdded(ObjectType objectType) {
 | 
			
		||||
        getObjectCounter(objectType).incrementAndGet();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportRemoved(ObjectType objectType) {
 | 
			
		||||
        getObjectCounter(objectType).decrementAndGet();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {
 | 
			
		||||
        double timingMs = timingNanos / 1000_000.0;
 | 
			
		||||
        if (timingMs < slowQueryThreshold) {
 | 
			
		||||
            log.debug("[{}] Executed data query in {} ms: {}", tenantId, timingMs, query);
 | 
			
		||||
        } else {
 | 
			
		||||
            log.warn("[{}] Executed slow data query in {} ms: {}", tenantId, timingMs, query);
 | 
			
		||||
        }
 | 
			
		||||
        dataQueryTimer.record(timingNanos, TimeUnit.NANOSECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {
 | 
			
		||||
        double timingMs = timingNanos / 1000_000.0;
 | 
			
		||||
        if (timingMs < slowQueryThreshold) {
 | 
			
		||||
            log.debug("[{}] Executed count query in {} ms: {}", tenantId, timingMs, query);
 | 
			
		||||
        } else {
 | 
			
		||||
            log.warn("[{}] Executed slow count query in {} ms: {}", tenantId, timingMs, query);
 | 
			
		||||
        }
 | 
			
		||||
        countQueryTimer.record(timingNanos, TimeUnit.NANOSECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private AtomicInteger getObjectCounter(ObjectType objectType) {
 | 
			
		||||
        return objectCounters.computeIfAbsent(objectType, type ->
 | 
			
		||||
                statsFactory.createGauge("edqsObjectsCount", new AtomicInteger(), "objectType", type.name()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -1,81 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edqs.stats;
 | 
			
		||||
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
import org.thingsboard.server.common.data.edqs.EdqsEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsType;
 | 
			
		||||
import org.thingsboard.server.queue.edqs.EdqsComponent;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@EdqsComponent
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@ConditionalOnProperty(name = "queue.edqs.stats.enabled", havingValue = "true", matchIfMissing = true)
 | 
			
		||||
public class EdqsStatsService {
 | 
			
		||||
 | 
			
		||||
    private final ConcurrentHashMap<TenantId, EdqsStats> statsMap = new ConcurrentHashMap<>();
 | 
			
		||||
    private final StatsFactory statsFactory;
 | 
			
		||||
 | 
			
		||||
    public void reportEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType) {
 | 
			
		||||
        statsMap.computeIfAbsent(tenantId, id -> new EdqsStats(tenantId, statsFactory))
 | 
			
		||||
                .reportEvent(objectType, eventType);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Getter
 | 
			
		||||
    @AllArgsConstructor
 | 
			
		||||
    static class EdqsStats {
 | 
			
		||||
 | 
			
		||||
        private final TenantId tenantId;
 | 
			
		||||
        private final ConcurrentHashMap<ObjectType, AtomicInteger> entityCounters = new ConcurrentHashMap<>();
 | 
			
		||||
        private final StatsFactory statsFactory;
 | 
			
		||||
 | 
			
		||||
        private AtomicInteger getOrCreateObjectCounter(ObjectType objectType) {
 | 
			
		||||
            return entityCounters.computeIfAbsent(objectType,
 | 
			
		||||
                    type -> statsFactory.createGauge(StatsType.EDQS.getName() + "_object_count", new AtomicInteger(),
 | 
			
		||||
                            "tenantId", tenantId.toString(), "objectType", type.name()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public String toString() {
 | 
			
		||||
            return entityCounters.entrySet().stream()
 | 
			
		||||
                    .map(counters -> counters.getKey().name()+ " total = [" + counters.getValue() + "]")
 | 
			
		||||
                    .collect(Collectors.joining(", "));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        public void reportEvent(ObjectType objectType, EdqsEventType eventType) {
 | 
			
		||||
            AtomicInteger objectCounter = getOrCreateObjectCounter(objectType);
 | 
			
		||||
            if (eventType == EdqsEventType.UPDATED){
 | 
			
		||||
                objectCounter.incrementAndGet();
 | 
			
		||||
            } else if (eventType == EdqsEventType.DELETED) {
 | 
			
		||||
                objectCounter.decrementAndGet();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -17,13 +17,18 @@ package org.thingsboard.server.edqs.util;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
 | 
			
		||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
 | 
			
		||||
import com.fasterxml.jackson.core.JsonParser;
 | 
			
		||||
import com.fasterxml.jackson.databind.DeserializationContext;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 | 
			
		||||
import com.fasterxml.jackson.databind.json.JsonMapper;
 | 
			
		||||
import com.fasterxml.jackson.databind.module.SimpleModule;
 | 
			
		||||
import com.google.protobuf.ByteString;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.SneakyThrows;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.TbStringPool;
 | 
			
		||||
import org.thingsboard.server.common.data.AttributeScope;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
@ -49,6 +54,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto;
 | 
			
		||||
import org.xerial.snappy.Snappy;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -188,14 +194,6 @@ public class EdqsConverter {
 | 
			
		||||
        return edqsEntity;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public EdqsObject check(ObjectType type, Object object) {
 | 
			
		||||
        if (object instanceof EdqsObject edqsObject) {
 | 
			
		||||
            return edqsObject;
 | 
			
		||||
        } else {
 | 
			
		||||
            return toEntity(type.toEntityType(), object);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @SuppressWarnings("unchecked")
 | 
			
		||||
    @SneakyThrows
 | 
			
		||||
    public <T extends EdqsObject> byte[] serialize(ObjectType type, T value) {
 | 
			
		||||
@ -220,12 +218,18 @@ public class EdqsConverter {
 | 
			
		||||
    @RequiredArgsConstructor
 | 
			
		||||
    private static class JsonConverter<T> implements Converter<T> {
 | 
			
		||||
 | 
			
		||||
        private static final SimpleModule module = new SimpleModule();
 | 
			
		||||
        private static final ObjectMapper mapper = JsonMapper.builder()
 | 
			
		||||
                .visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
 | 
			
		||||
                .visibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
 | 
			
		||||
                .visibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE)
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        static {
 | 
			
		||||
            module.addDeserializer(String.class, new InterningStringDeserializer());
 | 
			
		||||
            mapper.registerModule(module);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        private final Class<T> type;
 | 
			
		||||
 | 
			
		||||
        @SneakyThrows
 | 
			
		||||
@ -250,4 +254,17 @@ public class EdqsConverter {
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static class InterningStringDeserializer extends StdDeserializer<String> {
 | 
			
		||||
 | 
			
		||||
        public InterningStringDeserializer() {
 | 
			
		||||
            super(String.class);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public String deserialize(JsonParser p, DeserializationContext ctx) throws IOException {
 | 
			
		||||
            return TbStringPool.intern(p.getText());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,41 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.stats;
 | 
			
		||||
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@ConditionalOnMissingBean(value = EdqsStatsService.class, ignored = DummyEdqsStatsService.class)
 | 
			
		||||
public class DummyEdqsStatsService implements EdqsStatsService {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportAdded(ObjectType objectType) {}
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportRemoved(ObjectType objectType) {}
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {}
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,33 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2025 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.stats;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.ObjectType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
 | 
			
		||||
 | 
			
		||||
public interface EdqsStatsService {
 | 
			
		||||
 | 
			
		||||
    void reportAdded(ObjectType objectType);
 | 
			
		||||
 | 
			
		||||
    void reportRemoved(ObjectType objectType);
 | 
			
		||||
 | 
			
		||||
    void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos);
 | 
			
		||||
 | 
			
		||||
    void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -34,10 +34,14 @@ public class StatsTimer {
 | 
			
		||||
        this.timer = micrometerTimer;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public  void record(long timeMs) {
 | 
			
		||||
    public void record(long timeMs) {
 | 
			
		||||
        record(timeMs, TimeUnit.MILLISECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void record(long timing, TimeUnit timeUnit) {
 | 
			
		||||
        count++;
 | 
			
		||||
        totalTime += timeMs;
 | 
			
		||||
        timer.record(timeMs, TimeUnit.MILLISECONDS);
 | 
			
		||||
        totalTime += timeUnit.toMillis(timing);
 | 
			
		||||
        timer.record(timing, timeUnit);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public double getAvg() {
 | 
			
		||||
@ -47,7 +51,7 @@ public class StatsTimer {
 | 
			
		||||
        return (double) totalTime / count;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public  void reset() {
 | 
			
		||||
    public void reset() {
 | 
			
		||||
        count = 0;
 | 
			
		||||
        totalTime = 0;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -13,7 +13,7 @@
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edqs.util;
 | 
			
		||||
package org.thingsboard.common.util;
 | 
			
		||||
 | 
			
		||||
import com.google.common.hash.Hashing;
 | 
			
		||||
import org.springframework.util.ConcurrentReferenceHashMap;
 | 
			
		||||
@ -13,7 +13,7 @@
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edqs.util;
 | 
			
		||||
package org.thingsboard.common.util;
 | 
			
		||||
 | 
			
		||||
import org.springframework.util.ConcurrentReferenceHashMap;
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.springframework.util.CollectionUtils;
 | 
			
		||||
import org.thingsboard.common.util.TbStopWatch;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.HasCustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.HasEmail;
 | 
			
		||||
@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.query.EntityTypeFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.query.KeyFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
 | 
			
		||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
 | 
			
		||||
import org.thingsboard.server.common.stats.EdqsStatsService;
 | 
			
		||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
@ -89,6 +91,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
 | 
			
		||||
    @Lazy
 | 
			
		||||
    private EdqsApiService edqsApiService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdqsStatsService edqsStatsService;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) {
 | 
			
		||||
        log.trace("Executing countEntitiesByQuery, tenantId [{}], customerId [{}], query [{}]", tenantId, customerId, query);
 | 
			
		||||
@ -96,14 +101,19 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
 | 
			
		||||
        validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
 | 
			
		||||
        validateEntityCountQuery(query);
 | 
			
		||||
 | 
			
		||||
        TbStopWatch stopWatch = TbStopWatch.create();
 | 
			
		||||
        Long result;
 | 
			
		||||
        if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
 | 
			
		||||
            EdqsRequest request = EdqsRequest.builder()
 | 
			
		||||
                    .entityCountQuery(query)
 | 
			
		||||
                    .build();
 | 
			
		||||
            EdqsResponse response = processEdqsRequest(tenantId, customerId, request);
 | 
			
		||||
            return response.getEntityCountQueryResult();
 | 
			
		||||
            result = response.getEntityCountQueryResult();
 | 
			
		||||
        } else {
 | 
			
		||||
            result = entityQueryDao.countEntitiesByQuery(tenantId, customerId, query);
 | 
			
		||||
        }
 | 
			
		||||
        return this.entityQueryDao.countEntitiesByQuery(tenantId, customerId, query);
 | 
			
		||||
        edqsStatsService.reportCountQuery(tenantId, query, stopWatch.stopAndGetTotalTimeNanos());
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -113,27 +123,31 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
 | 
			
		||||
        validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
 | 
			
		||||
        validateEntityDataQuery(query);
 | 
			
		||||
 | 
			
		||||
        TbStopWatch stopWatch = TbStopWatch.create();
 | 
			
		||||
        PageData<EntityData> result;
 | 
			
		||||
        if (edqsApiService.isEnabled() && validForEdqs(query)) {
 | 
			
		||||
            EdqsRequest request = EdqsRequest.builder()
 | 
			
		||||
                    .entityDataQuery(query)
 | 
			
		||||
                    .build();
 | 
			
		||||
            EdqsResponse response = processEdqsRequest(tenantId, customerId, request);
 | 
			
		||||
            return response.getEntityDataQueryResult();
 | 
			
		||||
            result = response.getEntityDataQueryResult();
 | 
			
		||||
        } else {
 | 
			
		||||
            if (!isValidForOptimization(query)) {
 | 
			
		||||
                result = entityQueryDao.findEntityDataByQuery(tenantId, customerId, query);
 | 
			
		||||
            } else {
 | 
			
		||||
                // 1 step - find entity data by filter and sort columns
 | 
			
		||||
                PageData<EntityData> entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query);
 | 
			
		||||
                if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) {
 | 
			
		||||
                    result = entityDataByQuery;
 | 
			
		||||
                } else {
 | 
			
		||||
                    // 2 step - find entity data by entity ids from the 1st step
 | 
			
		||||
                    List<EntityData> entities = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData());
 | 
			
		||||
                    result = new PageData<>(entities, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (!isValidForOptimization(query)) {
 | 
			
		||||
            return this.entityQueryDao.findEntityDataByQuery(tenantId, customerId, query);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 1 step - find entity data by filter and sort columns
 | 
			
		||||
        PageData<EntityData> entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query);
 | 
			
		||||
        if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) {
 | 
			
		||||
            return entityDataByQuery;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 2 step - find entity data by entity ids from the 1st step
 | 
			
		||||
        List<EntityData> result = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData());
 | 
			
		||||
        return new PageData<>(result, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext());
 | 
			
		||||
        edqsStatsService.reportDataQuery(tenantId, query, stopWatch.stopAndGetTotalTimeNanos());
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean validForEdqs(EntityCountQuery query) { // for compatibility with PE
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user