EDQS: configurable string compression threshold; fix stats service condition

This commit is contained in:
ViacheslavKlimov 2025-04-09 14:52:11 +03:00
parent a91d9faad9
commit 16eb027956
5 changed files with 15 additions and 10 deletions

View File

@ -1765,6 +1765,8 @@ queue:
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
# Maximum timeout for requests to EDQS # Maximum timeout for requests to EDQS
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
# Strings longer than this threshold will be compressed
string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}"
stats: stats:
# Enable/disable statistics for EDQS # Enable/disable statistics for EDQS
enabled: "${TB_EDQS_STATS_ENABLED:true}" enabled: "${TB_EDQS_STATS_ENABLED:true}"

View File

@ -17,13 +17,12 @@ package org.thingsboard.server.edqs.data.dp;
import lombok.Getter; import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.common.util.TbBytePool; import org.thingsboard.common.util.TbBytePool;
import org.thingsboard.server.common.data.kv.DataType;
import org.xerial.snappy.Snappy; import org.xerial.snappy.Snappy;
public class CompressedStringDataPoint extends AbstractDataPoint { public class CompressedStringDataPoint extends AbstractDataPoint {
public static final int MIN_STR_SIZE_TO_COMPRESS = 512;
@Getter @Getter
private final byte[] compressedValue; private final byte[] compressedValue;

View File

@ -17,7 +17,7 @@ package org.thingsboard.server.edqs.stats;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -27,16 +27,14 @@ import org.thingsboard.server.common.stats.EdqsStatsService;
import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsTimer; import org.thingsboard.server.common.stats.StatsTimer;
import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.queue.edqs.EdqsComponent;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@EdqsComponent
@Service @Service
@Slf4j @Slf4j
@ConditionalOnProperty(name = "queue.edqs.stats.enabled", havingValue = "true", matchIfMissing = true) @ConditionalOnExpression("'${queue.edqs.api.supported:true}' == 'true' && '${queue.edqs.stats.enabled:true}' == 'true'")
public class DefaultEdqsStatsService implements EdqsStatsService { public class DefaultEdqsStatsService implements EdqsStatsService {
private final StatsFactory statsFactory; private final StatsFactory statsFactory;

View File

@ -27,6 +27,7 @@ import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.TbStringPool; import org.thingsboard.common.util.TbStringPool;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
@ -63,6 +64,9 @@ import java.util.UUID;
@Slf4j @Slf4j
public class EdqsConverter { public class EdqsConverter {
@Value("${queue.edqs.string_compression_length_threshold:512}")
private int stringCompressionLengthThreshold;
private final Map<ObjectType, Converter<? extends EdqsObject>> converters = new HashMap<>(); private final Map<ObjectType, Converter<? extends EdqsObject>> converters = new HashMap<>();
private final Converter<Entity> defaultConverter = new JsonConverter<>(Entity.class); private final Converter<Entity> defaultConverter = new JsonConverter<>(Entity.class);
@ -131,7 +135,7 @@ public class EdqsConverter {
}); });
} }
public static DataPointProto toDataPointProto(long ts, KvEntry kvEntry) { public DataPointProto toDataPointProto(long ts, KvEntry kvEntry) {
DataPointProto.Builder proto = DataPointProto.newBuilder(); DataPointProto.Builder proto = DataPointProto.newBuilder();
proto.setTs(ts); proto.setTs(ts);
switch (kvEntry.getDataType()) { switch (kvEntry.getDataType()) {
@ -140,7 +144,7 @@ public class EdqsConverter {
case DOUBLE -> proto.setDoubleV(kvEntry.getDoubleValue().get()); case DOUBLE -> proto.setDoubleV(kvEntry.getDoubleValue().get());
case STRING -> { case STRING -> {
String strValue = kvEntry.getStrValue().get(); String strValue = kvEntry.getStrValue().get();
if (strValue.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) { if (strValue.length() < stringCompressionLengthThreshold) {
proto.setStringV(strValue); proto.setStringV(strValue);
} else { } else {
proto.setCompressedStringV(ByteString.copyFrom(compress(strValue))); proto.setCompressedStringV(ByteString.copyFrom(compress(strValue)));
@ -148,7 +152,7 @@ public class EdqsConverter {
} }
case JSON -> { case JSON -> {
String jsonValue = kvEntry.getJsonValue().get(); String jsonValue = kvEntry.getJsonValue().get();
if (jsonValue.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) { if (jsonValue.length() < stringCompressionLengthThreshold) {
proto.setJsonV(jsonValue); proto.setJsonV(jsonValue);
} else { } else {
proto.setCompressedJsonV(ByteString.copyFrom(compress(jsonValue))); proto.setCompressedJsonV(ByteString.copyFrom(compress(jsonValue)));
@ -158,7 +162,7 @@ public class EdqsConverter {
return proto.build(); return proto.build();
} }
public static DataPoint fromDataPointProto(DataPointProto proto) { public DataPoint fromDataPointProto(DataPointProto proto) {
long ts = proto.getTs(); long ts = proto.getTs();
if (proto.hasBoolV()) { if (proto.hasBoolV()) {
return new BoolDataPoint(ts, proto.getBoolV()); return new BoolDataPoint(ts, proto.getBoolV());

View File

@ -71,6 +71,8 @@ queue:
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
# Maximum timeout for requests to EDQS # Maximum timeout for requests to EDQS
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
# Strings longer than this threshold will be compressed
string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}"
stats: stats:
# Enable/disable statistics for EDQS # Enable/disable statistics for EDQS
enabled: "${TB_EDQS_STATS_ENABLED:true}" enabled: "${TB_EDQS_STATS_ENABLED:true}"