From 16eb027956ffef4657a3e529e170d0e2067d17c2 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 9 Apr 2025 14:52:11 +0300 Subject: [PATCH] EDQS: configurable string compression threshold; fix stats service condition --- application/src/main/resources/thingsboard.yml | 2 ++ .../edqs/data/dp/CompressedStringDataPoint.java | 3 +-- .../server/edqs/stats/DefaultEdqsStatsService.java | 6 ++---- .../thingsboard/server/edqs/util/EdqsConverter.java | 12 ++++++++---- edqs/src/main/resources/edqs.yml | 2 ++ 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d8a22e478d..ecd4ddebb7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1765,6 +1765,8 @@ queue: max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" # Maximum timeout for requests to EDQS max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" + # Strings longer than this threshold will be compressed + string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}" stats: # Enable/disable statistics for EDQS enabled: "${TB_EDQS_STATS_ENABLED:true}" diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/CompressedStringDataPoint.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/CompressedStringDataPoint.java index f63abbbeb7..cf4267e443 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/CompressedStringDataPoint.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/CompressedStringDataPoint.java @@ -17,13 +17,12 @@ package org.thingsboard.server.edqs.data.dp; import lombok.Getter; import lombok.SneakyThrows; -import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.common.util.TbBytePool; +import org.thingsboard.server.common.data.kv.DataType; import org.xerial.snappy.Snappy; public class CompressedStringDataPoint extends AbstractDataPoint { - public static final int MIN_STR_SIZE_TO_COMPRESS = 512; @Getter private final byte[] compressedValue; diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/DefaultEdqsStatsService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/DefaultEdqsStatsService.java index 7769d2bfb8..3767d0f60b 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/DefaultEdqsStatsService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/DefaultEdqsStatsService.java @@ -17,7 +17,7 @@ package org.thingsboard.server.edqs.stats; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.id.TenantId; @@ -27,16 +27,14 @@ import org.thingsboard.server.common.stats.EdqsStatsService; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsTimer; import org.thingsboard.server.common.stats.StatsType; -import org.thingsboard.server.queue.edqs.EdqsComponent; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -@EdqsComponent @Service @Slf4j -@ConditionalOnProperty(name = "queue.edqs.stats.enabled", havingValue = "true", matchIfMissing = true) +@ConditionalOnExpression("'${queue.edqs.api.supported:true}' == 'true' && '${queue.edqs.stats.enabled:true}' == 'true'") public class DefaultEdqsStatsService implements EdqsStatsService { private final StatsFactory statsFactory; diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java index bb79e0b8df..167037b889 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java @@ -27,6 +27,7 @@ import com.google.protobuf.ByteString; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.common.util.TbStringPool; import org.thingsboard.server.common.data.AttributeScope; @@ -63,6 +64,9 @@ import java.util.UUID; @Slf4j public class EdqsConverter { + @Value("${queue.edqs.string_compression_length_threshold:512}") + private int stringCompressionLengthThreshold; + private final Map> converters = new HashMap<>(); private final Converter defaultConverter = new JsonConverter<>(Entity.class); @@ -131,7 +135,7 @@ public class EdqsConverter { }); } - public static DataPointProto toDataPointProto(long ts, KvEntry kvEntry) { + public DataPointProto toDataPointProto(long ts, KvEntry kvEntry) { DataPointProto.Builder proto = DataPointProto.newBuilder(); proto.setTs(ts); switch (kvEntry.getDataType()) { @@ -140,7 +144,7 @@ public class EdqsConverter { case DOUBLE -> proto.setDoubleV(kvEntry.getDoubleValue().get()); case STRING -> { String strValue = kvEntry.getStrValue().get(); - if (strValue.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) { + if (strValue.length() < stringCompressionLengthThreshold) { proto.setStringV(strValue); } else { proto.setCompressedStringV(ByteString.copyFrom(compress(strValue))); @@ -148,7 +152,7 @@ public class EdqsConverter { } case JSON -> { String jsonValue = kvEntry.getJsonValue().get(); - if (jsonValue.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) { + if (jsonValue.length() < stringCompressionLengthThreshold) { proto.setJsonV(jsonValue); } else { proto.setCompressedJsonV(ByteString.copyFrom(compress(jsonValue))); @@ -158,7 +162,7 @@ public class EdqsConverter { return proto.build(); } - public static DataPoint fromDataPointProto(DataPointProto proto) { + public DataPoint fromDataPointProto(DataPointProto proto) { long ts = proto.getTs(); if (proto.hasBoolV()) { return new BoolDataPoint(ts, proto.getBoolV()); diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index 353d76391b..2c0e0b8c5d 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -71,6 +71,8 @@ queue: max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" # Maximum timeout for requests to EDQS max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" + # Strings longer than this threshold will be compressed + string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}" stats: # Enable/disable statistics for EDQS enabled: "${TB_EDQS_STATS_ENABLED:true}"