More stats for EDQS
This commit is contained in:
parent
d1f0ec5033
commit
b806a41e62
@ -125,7 +125,7 @@ public abstract class BaseEntityData<T extends EntityFields> implements EntityDa
|
|||||||
return switch (key) {
|
return switch (key) {
|
||||||
case "createdTime" -> new LongDataPoint(System.currentTimeMillis(), fields.getCreatedTime());
|
case "createdTime" -> new LongDataPoint(System.currentTimeMillis(), fields.getCreatedTime());
|
||||||
case "edgeTemplate" -> new BoolDataPoint(System.currentTimeMillis(), fields.isEdgeTemplate());
|
case "edgeTemplate" -> new BoolDataPoint(System.currentTimeMillis(), fields.isEdgeTemplate());
|
||||||
case "parentId" -> new StringDataPoint(System.currentTimeMillis(), getRelatedParentId(ctx));
|
case "parentId" -> new StringDataPoint(System.currentTimeMillis(), getRelatedParentId(ctx), false);
|
||||||
default -> new StringDataPoint(System.currentTimeMillis(), getField(key), false);
|
default -> new StringDataPoint(System.currentTimeMillis(), getField(key), false);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,10 +17,12 @@ package org.thingsboard.server.edqs.data.dp;
|
|||||||
|
|
||||||
import org.thingsboard.server.common.data.kv.DataType;
|
import org.thingsboard.server.common.data.kv.DataType;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class CompressedJsonDataPoint extends CompressedStringDataPoint {
|
public class CompressedJsonDataPoint extends CompressedStringDataPoint {
|
||||||
|
|
||||||
public CompressedJsonDataPoint(long ts, byte[] compressedValue) {
|
public CompressedJsonDataPoint(long ts, byte[] compressedValue, Function<byte[], String> uncompressor) {
|
||||||
super(ts, compressedValue);
|
super(ts, compressedValue, uncompressor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -19,17 +19,21 @@ import lombok.Getter;
|
|||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import org.thingsboard.common.util.TbBytePool;
|
import org.thingsboard.common.util.TbBytePool;
|
||||||
import org.thingsboard.server.common.data.kv.DataType;
|
import org.thingsboard.server.common.data.kv.DataType;
|
||||||
import org.xerial.snappy.Snappy;
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
public class CompressedStringDataPoint extends AbstractDataPoint {
|
public class CompressedStringDataPoint extends AbstractDataPoint {
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final byte[] compressedValue;
|
private final byte[] compressedValue;
|
||||||
|
|
||||||
|
protected final Function<byte[], String> uncompressor;
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public CompressedStringDataPoint(long ts, byte[] compressedValue) {
|
public CompressedStringDataPoint(long ts, byte[] compressedValue, Function<byte[], String> uncompressor) {
|
||||||
super(ts);
|
super(ts);
|
||||||
this.compressedValue = TbBytePool.intern(compressedValue);
|
this.compressedValue = TbBytePool.intern(compressedValue);
|
||||||
|
this.uncompressor = uncompressor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -40,7 +44,7 @@ public class CompressedStringDataPoint extends AbstractDataPoint {
|
|||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@Override
|
@Override
|
||||||
public String getStr() {
|
public String getStr() {
|
||||||
return Snappy.uncompressString(compressedValue);
|
return uncompressor.apply(compressedValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.edqs.repo;
|
package org.thingsboard.server.edqs.repo;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.common.data.ObjectType;
|
import org.thingsboard.server.common.data.ObjectType;
|
||||||
@ -40,6 +41,7 @@ import java.util.function.Predicate;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class DefaultEdqsRepository implements EdqsRepository {
|
public class DefaultEdqsRepository implements EdqsRepository {
|
||||||
|
|
||||||
|
@Getter
|
||||||
private final static ConcurrentMap<TenantId, TenantRepo> repos = new ConcurrentHashMap<>();
|
private final static ConcurrentMap<TenantId, TenantRepo> repos = new ConcurrentHashMap<>();
|
||||||
private final EdqsStatsService statsService;
|
private final EdqsStatsService statsService;
|
||||||
|
|
||||||
@ -52,6 +54,7 @@ public class DefaultEdqsRepository implements EdqsRepository {
|
|||||||
if (event.getEventType() == EdqsEventType.DELETED && event.getObjectType() == ObjectType.TENANT) {
|
if (event.getEventType() == EdqsEventType.DELETED && event.getObjectType() == ObjectType.TENANT) {
|
||||||
log.info("Tenant {} deleted", event.getTenantId());
|
log.info("Tenant {} deleted", event.getTenantId());
|
||||||
repos.remove(event.getTenantId());
|
repos.remove(event.getTenantId());
|
||||||
|
statsService.reportRemoved(ObjectType.TENANT);
|
||||||
} else {
|
} else {
|
||||||
get(event.getTenantId()).processEvent(event);
|
get(event.getTenantId()).processEvent(event);
|
||||||
}
|
}
|
||||||
@ -61,8 +64,7 @@ public class DefaultEdqsRepository implements EdqsRepository {
|
|||||||
public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query, boolean ignorePermissionCheck) {
|
public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query, boolean ignorePermissionCheck) {
|
||||||
long startNs = System.nanoTime();
|
long startNs = System.nanoTime();
|
||||||
long result = get(tenantId).countEntitiesByQuery(customerId, query, ignorePermissionCheck);
|
long result = get(tenantId).countEntitiesByQuery(customerId, query, ignorePermissionCheck);
|
||||||
double timingMs = (double) (System.nanoTime() - startNs) / 1000_000;
|
statsService.reportEdqsCountQuery(tenantId, query, System.nanoTime() - startNs);
|
||||||
log.info("countEntitiesByQuery done in {} ms", timingMs);
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,8 +73,7 @@ public class DefaultEdqsRepository implements EdqsRepository {
|
|||||||
EntityDataQuery query, boolean ignorePermissionCheck) {
|
EntityDataQuery query, boolean ignorePermissionCheck) {
|
||||||
long startNs = System.nanoTime();
|
long startNs = System.nanoTime();
|
||||||
var result = get(tenantId).findEntityDataByQuery(customerId, query, ignorePermissionCheck);
|
var result = get(tenantId).findEntityDataByQuery(customerId, query, ignorePermissionCheck);
|
||||||
double timingMs = (double) (System.nanoTime() - startNs) / 1000_000;
|
statsService.reportEdqsDataQuery(tenantId, query, System.nanoTime() - startNs);
|
||||||
log.info("findEntityDataByQuery done in {} ms", timingMs);
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -332,7 +332,6 @@ public class TenantRepo {
|
|||||||
|
|
||||||
public PageData<QueryResult> findEntityDataByQuery(CustomerId customerId, EntityDataQuery oldQuery, boolean ignorePermissionCheck) {
|
public PageData<QueryResult> findEntityDataByQuery(CustomerId customerId, EntityDataQuery oldQuery, boolean ignorePermissionCheck) {
|
||||||
EdqsDataQuery query = RepositoryUtils.toNewQuery(oldQuery);
|
EdqsDataQuery query = RepositoryUtils.toNewQuery(oldQuery);
|
||||||
log.info("[{}][{}] findEntityDataByQuery: {}", tenantId, customerId, query);
|
|
||||||
QueryContext ctx = buildContext(customerId, query.getEntityFilter(), ignorePermissionCheck);
|
QueryContext ctx = buildContext(customerId, query.getEntityFilter(), ignorePermissionCheck);
|
||||||
EntityQueryProcessor queryProcessor = EntityQueryProcessorFactory.create(this, ctx, query);
|
EntityQueryProcessor queryProcessor = EntityQueryProcessorFactory.create(this, ctx, query);
|
||||||
return sortAndConvert(query, queryProcessor.processQuery(), ctx);
|
return sortAndConvert(query, queryProcessor.processQuery(), ctx);
|
||||||
@ -340,7 +339,6 @@ public class TenantRepo {
|
|||||||
|
|
||||||
public long countEntitiesByQuery(CustomerId customerId, EntityCountQuery oldQuery, boolean ignorePermissionCheck) {
|
public long countEntitiesByQuery(CustomerId customerId, EntityCountQuery oldQuery, boolean ignorePermissionCheck) {
|
||||||
EdqsQuery query = RepositoryUtils.toNewQuery(oldQuery);
|
EdqsQuery query = RepositoryUtils.toNewQuery(oldQuery);
|
||||||
log.info("[{}][{}] countEntitiesByQuery: {}", tenantId, customerId, query);
|
|
||||||
QueryContext ctx = buildContext(customerId, query.getEntityFilter(), ignorePermissionCheck);
|
QueryContext ctx = buildContext(customerId, query.getEntityFilter(), ignorePermissionCheck);
|
||||||
EntityQueryProcessor queryProcessor = EntityQueryProcessorFactory.create(this, ctx, query);
|
EntityQueryProcessor queryProcessor = EntityQueryProcessorFactory.create(this, ctx, query);
|
||||||
return queryProcessor.count();
|
return queryProcessor.count();
|
||||||
|
|||||||
@ -15,78 +15,123 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.edqs.stats;
|
package org.thingsboard.server.edqs.stats;
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.common.util.TbBytePool;
|
||||||
|
import org.thingsboard.common.util.TbStringPool;
|
||||||
import org.thingsboard.server.common.data.ObjectType;
|
import org.thingsboard.server.common.data.ObjectType;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
|
import org.thingsboard.server.common.data.query.EntityCountQuery;
|
||||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||||
import org.thingsboard.server.common.stats.EdqsStatsService;
|
import org.thingsboard.server.common.stats.EdqsStatsService;
|
||||||
|
import org.thingsboard.server.common.stats.StatsCounter;
|
||||||
import org.thingsboard.server.common.stats.StatsFactory;
|
import org.thingsboard.server.common.stats.StatsFactory;
|
||||||
import org.thingsboard.server.common.stats.StatsTimer;
|
import org.thingsboard.server.common.stats.StatsTimer;
|
||||||
import org.thingsboard.server.common.stats.StatsType;
|
import org.thingsboard.server.common.stats.StatsType;
|
||||||
|
import org.thingsboard.server.edqs.repo.DefaultEdqsRepository;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
@ConditionalOnExpression("'${queue.edqs.api.supported:true}' == 'true' && '${queue.edqs.stats.enabled:true}' == 'true'")
|
@ConditionalOnExpression("'${queue.edqs.api.supported:true}' == 'true' && '${queue.edqs.stats.enabled:true}' == 'true'")
|
||||||
public class DefaultEdqsStatsService implements EdqsStatsService {
|
public class DefaultEdqsStatsService implements EdqsStatsService {
|
||||||
|
|
||||||
private final StatsFactory statsFactory;
|
private final StatsFactory statsFactory;
|
||||||
|
|
||||||
@Value("${queue.edqs.stats.slow_query_threshold:3000}")
|
@Value("${queue.edqs.stats.slow_query_threshold}")
|
||||||
private int slowQueryThreshold;
|
private int slowQueryThreshold;
|
||||||
|
|
||||||
private final ConcurrentHashMap<ObjectType, AtomicInteger> objectCounters = new ConcurrentHashMap<>();
|
private final ConcurrentMap<ObjectType, AtomicInteger> objectCounters = new ConcurrentHashMap<>();
|
||||||
private final StatsTimer dataQueryTimer;
|
private final ConcurrentMap<String, StatsTimer> timers = new ConcurrentHashMap<>();
|
||||||
private final StatsTimer countQueryTimer;
|
private final ConcurrentMap<String, StatsCounter> counters = new ConcurrentHashMap<>();
|
||||||
|
private final ConcurrentMap<String, AtomicInteger> gauges = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private DefaultEdqsStatsService(StatsFactory statsFactory) {
|
@PostConstruct
|
||||||
this.statsFactory = statsFactory;
|
private void init() {
|
||||||
dataQueryTimer = statsFactory.createTimer(StatsType.EDQS, "entityDataQueryTimer");
|
statsFactory.createGauge(StatsType.EDQS, "stringPoolSize", TbStringPool.getPool(), Map::size);
|
||||||
countQueryTimer = statsFactory.createTimer(StatsType.EDQS, "entityCountQueryTimer");
|
statsFactory.createGauge(StatsType.EDQS, "bytePoolSize", TbBytePool.getPool(), Map::size);
|
||||||
|
statsFactory.createGauge(StatsType.EDQS, "tenantReposSize", DefaultEdqsRepository.getRepos(), Map::size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reportAdded(ObjectType objectType) {
|
public void reportAdded(ObjectType objectType) {
|
||||||
getObjectCounter(objectType).incrementAndGet();
|
getObjectGauge(objectType).incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reportRemoved(ObjectType objectType) {
|
public void reportRemoved(ObjectType objectType) {
|
||||||
getObjectCounter(objectType).decrementAndGet();
|
getObjectGauge(objectType).decrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {
|
public void reportEntityDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {
|
||||||
double timingMs = timingNanos / 1000_000.0;
|
checkTiming(tenantId, query, timingNanos);
|
||||||
if (timingMs < slowQueryThreshold) {
|
getTimer("entityDataQueryTimer").record(timingNanos, TimeUnit.NANOSECONDS);
|
||||||
log.debug("[{}] Executed data query in {} ms: {}", tenantId, timingMs, query);
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] Executed slow data query in {} ms: {}", tenantId, timingMs, query);
|
|
||||||
}
|
|
||||||
dataQueryTimer.record(timingNanos, TimeUnit.NANOSECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {
|
public void reportEntityCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {
|
||||||
double timingMs = timingNanos / 1000_000.0;
|
checkTiming(tenantId, query, timingNanos);
|
||||||
if (timingMs < slowQueryThreshold) {
|
getTimer("entityCountQueryTimer").record(timingNanos, TimeUnit.NANOSECONDS);
|
||||||
log.debug("[{}] Executed count query in {} ms: {}", tenantId, timingMs, query);
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] Executed slow count query in {} ms: {}", tenantId, timingMs, query);
|
|
||||||
}
|
|
||||||
countQueryTimer.record(timingNanos, TimeUnit.NANOSECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private AtomicInteger getObjectCounter(ObjectType objectType) {
|
@Override
|
||||||
|
public void reportEdqsDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {
|
||||||
|
checkTiming(tenantId, query, timingNanos);
|
||||||
|
getTimer("edqsDataQueryTimer").record(timingNanos, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportEdqsCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {
|
||||||
|
checkTiming(tenantId, query, timingNanos);
|
||||||
|
getTimer("edqsCountQueryTimer").record(timingNanos, TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportStringCompressed() {
|
||||||
|
getCounter("stringsCompressed").increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportStringUncompressed() {
|
||||||
|
getCounter("stringsUncompressed").increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkTiming(TenantId tenantId, EntityCountQuery query, long timingNanos) {
|
||||||
|
double timingMs = timingNanos / 1000_000.0;
|
||||||
|
String queryType = query instanceof EntityDataQuery ? "data" : "count";
|
||||||
|
if (timingMs < slowQueryThreshold) {
|
||||||
|
log.debug("[{}] Executed " + queryType + " query in {} ms: {}", tenantId, timingMs, query);
|
||||||
|
} else {
|
||||||
|
log.warn("[{}] Executed slow " + queryType + " query in {} ms: {}", tenantId, timingMs, query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private StatsTimer getTimer(String name) {
|
||||||
|
return timers.computeIfAbsent(name, __ -> statsFactory.createTimer(StatsType.EDQS, name));
|
||||||
|
}
|
||||||
|
|
||||||
|
private StatsCounter getCounter(String name) {
|
||||||
|
return counters.computeIfAbsent(name, __ -> statsFactory.createStatsCounter(StatsType.EDQS.getName(), name));
|
||||||
|
}
|
||||||
|
|
||||||
|
private AtomicInteger getGauge(String name) {
|
||||||
|
return gauges.computeIfAbsent(name, __ -> statsFactory.createGauge(StatsType.EDQS, name, new AtomicInteger()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private AtomicInteger getObjectGauge(ObjectType objectType) {
|
||||||
return objectCounters.computeIfAbsent(objectType, type ->
|
return objectCounters.computeIfAbsent(objectType, type ->
|
||||||
statsFactory.createGauge("edqsObjectsCount", new AtomicInteger(), "objectType", type.name()));
|
statsFactory.createGauge(StatsType.EDQS, "objectsCount", new AtomicInteger(), "objectType", type.name()));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.id.EntityId;
|
|||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||||
|
import org.thingsboard.server.common.stats.EdqsStatsService;
|
||||||
import org.thingsboard.server.common.util.ProtoUtils;
|
import org.thingsboard.server.common.util.ProtoUtils;
|
||||||
import org.thingsboard.server.edqs.data.dp.BoolDataPoint;
|
import org.thingsboard.server.edqs.data.dp.BoolDataPoint;
|
||||||
import org.thingsboard.server.edqs.data.dp.CompressedJsonDataPoint;
|
import org.thingsboard.server.edqs.data.dp.CompressedJsonDataPoint;
|
||||||
@ -56,14 +57,18 @@ import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto;
|
|||||||
import org.xerial.snappy.Snappy;
|
import org.xerial.snappy.Snappy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class EdqsConverter {
|
public class EdqsConverter {
|
||||||
|
|
||||||
|
private final EdqsStatsService edqsStatsService;
|
||||||
|
|
||||||
@Value("${queue.edqs.string_compression_length_threshold:512}")
|
@Value("${queue.edqs.string_compression_length_threshold:512}")
|
||||||
private int stringCompressionLengthThreshold;
|
private int stringCompressionLengthThreshold;
|
||||||
|
|
||||||
@ -175,32 +180,40 @@ public class EdqsConverter {
|
|||||||
if (stringV.length() < stringCompressionLengthThreshold) {
|
if (stringV.length() < stringCompressionLengthThreshold) {
|
||||||
return new StringDataPoint(ts, stringV);
|
return new StringDataPoint(ts, stringV);
|
||||||
} else {
|
} else {
|
||||||
return new CompressedStringDataPoint(ts, compress(stringV));
|
return new CompressedStringDataPoint(ts, compress(stringV), this::uncompress);
|
||||||
}
|
}
|
||||||
} else if (proto.hasCompressedStringV()) {
|
} else if (proto.hasCompressedStringV()) {
|
||||||
return new CompressedStringDataPoint(ts, proto.getCompressedStringV().toByteArray());
|
return new CompressedStringDataPoint(ts, proto.getCompressedStringV().toByteArray(), this::uncompress);
|
||||||
} else if (proto.hasJsonV()) {
|
} else if (proto.hasJsonV()) {
|
||||||
String jsonV = proto.getJsonV();
|
String jsonV = proto.getJsonV();
|
||||||
if (jsonV.length() < stringCompressionLengthThreshold) {
|
if (jsonV.length() < stringCompressionLengthThreshold) {
|
||||||
return new JsonDataPoint(ts, jsonV);
|
return new JsonDataPoint(ts, jsonV);
|
||||||
} else {
|
} else {
|
||||||
return new CompressedJsonDataPoint(ts, compress(jsonV));
|
return new CompressedJsonDataPoint(ts, compress(jsonV), this::uncompress);
|
||||||
}
|
}
|
||||||
} else if (proto.hasCompressedJsonV()) {
|
} else if (proto.hasCompressedJsonV()) {
|
||||||
return new CompressedJsonDataPoint(ts, proto.getCompressedJsonV().toByteArray());
|
return new CompressedJsonDataPoint(ts, proto.getCompressedJsonV().toByteArray(), this::uncompress);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unsupported data point proto: " + proto);
|
throw new IllegalArgumentException("Unsupported data point proto: " + proto);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
private static byte[] compress(String value) {
|
private byte[] compress(String value) {
|
||||||
byte[] compressed = Snappy.compress(value);
|
byte[] compressed = Snappy.compress(value, StandardCharsets.UTF_8);
|
||||||
// TODO: limit the size
|
log.debug("Compressed {} chars to {} bytes", value.length(), compressed.length);
|
||||||
log.debug("Compressed {} bytes to {} bytes", value.length(), compressed.length);
|
edqsStatsService.reportStringCompressed();
|
||||||
return compressed;
|
return compressed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
private String uncompress(byte[] compressed) {
|
||||||
|
String value = Snappy.uncompressString(compressed, StandardCharsets.UTF_8);
|
||||||
|
log.debug("Uncompressed {} bytes to {} chars", compressed.length, value.length());
|
||||||
|
edqsStatsService.reportStringUncompressed();
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
public static Entity toEntity(EntityType entityType, Object entity) {
|
public static Entity toEntity(EntityType entityType, Object entity) {
|
||||||
Entity edqsEntity = new Entity();
|
Entity edqsEntity = new Entity();
|
||||||
edqsEntity.setType(entityType);
|
edqsEntity.setType(entityType);
|
||||||
|
|||||||
@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.ToDoubleFunction;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public class DefaultStatsFactory implements StatsFactory {
|
public class DefaultStatsFactory implements StatsFactory {
|
||||||
@ -86,6 +87,16 @@ public class DefaultStatsFactory implements StatsFactory {
|
|||||||
return meterRegistry.gauge(key, Tags.of(tags), number);
|
return meterRegistry.gauge(key, Tags.of(tags), number);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends Number> T createGauge(StatsType statsType, String name, T number, String... tags) {
|
||||||
|
return createGauge(statsType.getName(), number, getTags(name, tags));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <S> void createGauge(StatsType statsType, String name, S stateObject, ToDoubleFunction<S> numberProvider, String... tags) {
|
||||||
|
meterRegistry.gauge(statsType.getName(), Tags.of(getTags(name, tags)), stateObject, numberProvider);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessagesStats createMessagesStats(String key) {
|
public MessagesStats createMessagesStats(String key) {
|
||||||
StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS);
|
StatsCounter totalCounter = createStatsCounter(key, TOTAL_MSGS);
|
||||||
|
|||||||
@ -33,9 +33,21 @@ public class DummyEdqsStatsService implements EdqsStatsService {
|
|||||||
public void reportRemoved(ObjectType objectType) {}
|
public void reportRemoved(ObjectType objectType) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {}
|
public void reportEntityDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {}
|
public void reportEntityCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportEdqsDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportEdqsCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportStringCompressed() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reportStringUncompressed() {}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,8 +26,16 @@ public interface EdqsStatsService {
|
|||||||
|
|
||||||
void reportRemoved(ObjectType objectType);
|
void reportRemoved(ObjectType objectType);
|
||||||
|
|
||||||
void reportDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos);
|
void reportEntityDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos);
|
||||||
|
|
||||||
void reportCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos);
|
void reportEntityCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos);
|
||||||
|
|
||||||
|
void reportEdqsDataQuery(TenantId tenantId, EntityDataQuery query, long timingNanos);
|
||||||
|
|
||||||
|
void reportEdqsCountQuery(TenantId tenantId, EntityCountQuery query, long timingNanos);
|
||||||
|
|
||||||
|
void reportStringCompressed();
|
||||||
|
|
||||||
|
void reportStringUncompressed();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,8 @@ package org.thingsboard.server.common.stats;
|
|||||||
|
|
||||||
import io.micrometer.core.instrument.Timer;
|
import io.micrometer.core.instrument.Timer;
|
||||||
|
|
||||||
|
import java.util.function.ToDoubleFunction;
|
||||||
|
|
||||||
public interface StatsFactory {
|
public interface StatsFactory {
|
||||||
|
|
||||||
StatsCounter createStatsCounter(String key, String statsName, String... otherTags);
|
StatsCounter createStatsCounter(String key, String statsName, String... otherTags);
|
||||||
@ -25,6 +27,10 @@ public interface StatsFactory {
|
|||||||
|
|
||||||
<T extends Number> T createGauge(String key, T number, String... tags);
|
<T extends Number> T createGauge(String key, T number, String... tags);
|
||||||
|
|
||||||
|
<T extends Number> T createGauge(StatsType statsType, String name, T number, String... tags);
|
||||||
|
|
||||||
|
<S> void createGauge(StatsType statsType, String name, S stateObject, ToDoubleFunction<S> numberProvider, String... tags);
|
||||||
|
|
||||||
MessagesStats createMessagesStats(String key);
|
MessagesStats createMessagesStats(String key);
|
||||||
|
|
||||||
Timer createTimer(String key, String... tags);
|
Timer createTimer(String key, String... tags);
|
||||||
|
|||||||
@ -16,12 +16,14 @@
|
|||||||
package org.thingsboard.common.util;
|
package org.thingsboard.common.util;
|
||||||
|
|
||||||
import com.google.common.hash.Hashing;
|
import com.google.common.hash.Hashing;
|
||||||
|
import lombok.Getter;
|
||||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
public class TbBytePool {
|
public class TbBytePool {
|
||||||
|
|
||||||
|
@Getter
|
||||||
private static final ConcurrentMap<String, byte[]> pool = new ConcurrentReferenceHashMap<>();
|
private static final ConcurrentMap<String, byte[]> pool = new ConcurrentReferenceHashMap<>();
|
||||||
|
|
||||||
public static byte[] intern(byte[] data) {
|
public static byte[] intern(byte[] data) {
|
||||||
@ -32,8 +34,4 @@ public class TbBytePool {
|
|||||||
return pool.computeIfAbsent(checksum, c -> data);
|
return pool.computeIfAbsent(checksum, c -> data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int size(){
|
|
||||||
return pool.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,12 +15,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.common.util;
|
package org.thingsboard.common.util;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
public class TbStringPool {
|
public class TbStringPool {
|
||||||
|
|
||||||
|
@Getter
|
||||||
private static final ConcurrentMap<String, String> pool = new ConcurrentReferenceHashMap<>();
|
private static final ConcurrentMap<String, String> pool = new ConcurrentReferenceHashMap<>();
|
||||||
|
|
||||||
public static String intern(String data) {
|
public static String intern(String data) {
|
||||||
@ -30,8 +32,4 @@ public class TbStringPool {
|
|||||||
return pool.computeIfAbsent(data, str -> str);
|
return pool.computeIfAbsent(data, str -> str);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int size(){
|
|
||||||
return pool.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,7 +20,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.thingsboard.common.util.TbStopWatch;
|
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.HasCustomerId;
|
import org.thingsboard.server.common.data.HasCustomerId;
|
||||||
import org.thingsboard.server.common.data.HasEmail;
|
import org.thingsboard.server.common.data.HasEmail;
|
||||||
@ -101,7 +100,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
|||||||
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
|
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
|
||||||
validateEntityCountQuery(query);
|
validateEntityCountQuery(query);
|
||||||
|
|
||||||
TbStopWatch stopWatch = TbStopWatch.create();
|
long startNs = System.nanoTime();
|
||||||
Long result;
|
Long result;
|
||||||
if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
|
if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
|
||||||
EdqsRequest request = EdqsRequest.builder()
|
EdqsRequest request = EdqsRequest.builder()
|
||||||
@ -112,7 +111,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
|||||||
} else {
|
} else {
|
||||||
result = entityQueryDao.countEntitiesByQuery(tenantId, customerId, query);
|
result = entityQueryDao.countEntitiesByQuery(tenantId, customerId, query);
|
||||||
}
|
}
|
||||||
edqsStatsService.reportCountQuery(tenantId, query, stopWatch.stopAndGetTotalTimeNanos());
|
edqsStatsService.reportEntityCountQuery(tenantId, query, System.nanoTime() - startNs);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -123,7 +122,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
|||||||
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
|
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
|
||||||
validateEntityDataQuery(query);
|
validateEntityDataQuery(query);
|
||||||
|
|
||||||
TbStopWatch stopWatch = TbStopWatch.create();
|
long startNs = System.nanoTime();
|
||||||
PageData<EntityData> result;
|
PageData<EntityData> result;
|
||||||
if (edqsApiService.isEnabled() && validForEdqs(query)) {
|
if (edqsApiService.isEnabled() && validForEdqs(query)) {
|
||||||
EdqsRequest request = EdqsRequest.builder()
|
EdqsRequest request = EdqsRequest.builder()
|
||||||
@ -146,7 +145,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
edqsStatsService.reportDataQuery(tenantId, query, stopWatch.stopAndGetTotalTimeNanos());
|
edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -76,6 +76,8 @@ queue:
|
|||||||
stats:
|
stats:
|
||||||
# Enable/disable statistics for EDQS
|
# Enable/disable statistics for EDQS
|
||||||
enabled: "${TB_EDQS_STATS_ENABLED:true}"
|
enabled: "${TB_EDQS_STATS_ENABLED:true}"
|
||||||
|
# Threshold for slow queries to log, in milliseconds
|
||||||
|
slow_query_threshold: "${TB_EDQS_SLOW_QUERY_THRESHOLD_MS:200}"
|
||||||
|
|
||||||
kafka:
|
kafka:
|
||||||
# Kafka Bootstrap nodes in "host:port" format
|
# Kafka Bootstrap nodes in "host:port" format
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user