EDQS: optimize attributes and latest kv

This commit is contained in:
ViacheslavKlimov 2025-02-18 17:01:39 +02:00
parent 2a72516106
commit fc5af45519
17 changed files with 125 additions and 241 deletions

View File

@ -166,7 +166,7 @@ public abstract class EdqsSyncService {
if (entityIdInfo != null) { if (entityIdInfo != null) {
process(entityIdInfo.tenantId(), RELATION, relation.toData()); process(entityIdInfo.tenantId(), RELATION, relation.toData());
} else { } else {
log.info("Relation from entity not found: " + relation.getFromId()); log.info("Relation from entity not found: " + relation.getFromType() + " " + relation.getFromId());
} }
} }
} }

View File

@ -155,7 +155,7 @@ class DefaultTelemetrySubscriptionServiceTest {
lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(Collections.emptyList())); lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(Collections.emptyList()));
// send partition change event so currentPartitions set is populated // send partition change event so currentPartitions set is populated
telemetryService.onTbApplicationEvent(new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(new QueueKey(ServiceType.TB_CORE), Set.of(tpi)))); telemetryService.onTbApplicationEvent(new PartitionChangeEvent(this, ServiceType.TB_CORE, Map.of(new QueueKey(ServiceType.TB_CORE), Set.of(tpi)), Collections.emptyMap()));
} }
@AfterEach @AfterEach

View File

@ -35,8 +35,10 @@ public class AttributeKv implements EdqsObject {
private String key; private String key;
private Long version; private Long version;
private Long lastUpdateTs; // optional (on deletion) private DataPoint dataPoint; // optional (on deletion)
private KvEntry value; // optional (on deletion)
private Long lastUpdateTs; // only for serialization
private KvEntry value; // only for serialization
public AttributeKv(EntityId entityId, AttributeScope scope, AttributeKvEntry attributeKvEntry, long version) { public AttributeKv(EntityId entityId, AttributeScope scope, AttributeKvEntry attributeKvEntry, long version) {
this.entityId = entityId; this.entityId = entityId;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.edqs.data.dp; package org.thingsboard.server.common.data.edqs;
import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DataType;

View File

@ -33,8 +33,10 @@ public class LatestTsKv implements EdqsObject {
private String key; private String key;
private Long version; private Long version;
private Long ts; // optional (on deletion) private DataPoint dataPoint; // optional (on deletion)
private KvEntry value; // optional (on deletion)
private Long ts; // only for serialization
private KvEntry value; // only for serialization
public LatestTsKv(EntityId entityId, TsKvEntry tsKvEntry, Long version) { public LatestTsKv(EntityId entityId, TsKvEntry tsKvEntry, Long version) {
this.entityId = entityId; this.entityId = entityId;

View File

@ -25,7 +25,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.permission.QueryContext; import org.thingsboard.server.common.data.permission.QueryContext;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.edqs.data.dp.BoolDataPoint; import org.thingsboard.server.edqs.data.dp.BoolDataPoint;
import org.thingsboard.server.edqs.data.dp.DataPoint; import org.thingsboard.server.common.data.edqs.DataPoint;
import org.thingsboard.server.edqs.data.dp.LongDataPoint; import org.thingsboard.server.edqs.data.dp.LongDataPoint;
import org.thingsboard.server.edqs.data.dp.StringDataPoint; import org.thingsboard.server.edqs.data.dp.StringDataPoint;
import org.thingsboard.server.edqs.query.DataKey; import org.thingsboard.server.edqs.query.DataKey;

View File

@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edqs.fields.DeviceFields; import org.thingsboard.server.common.data.edqs.fields.DeviceFields;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.edqs.data.dp.DataPoint; import org.thingsboard.server.common.data.edqs.DataPoint;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;

View File

@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edqs.fields.EntityFields; import org.thingsboard.server.common.data.edqs.fields.EntityFields;
import org.thingsboard.server.common.data.permission.QueryContext; import org.thingsboard.server.common.data.permission.QueryContext;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.edqs.data.dp.DataPoint; import org.thingsboard.server.common.data.edqs.DataPoint;
import org.thingsboard.server.edqs.query.DataKey; import org.thingsboard.server.edqs.query.DataKey;
import org.thingsboard.server.edqs.repo.TenantRepo; import org.thingsboard.server.edqs.repo.TenantRepo;

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.edqs.data.dp;
import lombok.Getter; import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.data.edqs.DataPoint;
@RequiredArgsConstructor @RequiredArgsConstructor
public abstract class AbstractDataPoint implements DataPoint { public abstract class AbstractDataPoint implements DataPoint {

View File

@ -19,8 +19,8 @@ import org.thingsboard.server.common.data.kv.DataType;
public class CompressedJsonDataPoint extends CompressedStringDataPoint { public class CompressedJsonDataPoint extends CompressedStringDataPoint {
public CompressedJsonDataPoint(long ts, String value) { public CompressedJsonDataPoint(long ts, byte[] compressedValue) {
super(ts, value); super(ts, compressedValue);
} }
@Override @Override

View File

@ -18,30 +18,18 @@ package org.thingsboard.server.edqs.data.dp;
import lombok.Getter; import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.edqs.repo.TbBytePool;
import org.xerial.snappy.Snappy; import org.xerial.snappy.Snappy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class CompressedStringDataPoint extends AbstractDataPoint { public class CompressedStringDataPoint extends AbstractDataPoint {
public static final int MIN_STR_SIZE_TO_COMPRESS = 512; public static final int MIN_STR_SIZE_TO_COMPRESS = 512;
@Getter @Getter
private final byte[] value; private final byte[] compressedValue;
public static final AtomicInteger cnt = new AtomicInteger();
public static final AtomicLong uncompressedLength = new AtomicLong();
public static final AtomicLong compressedLength = new AtomicLong();
@SneakyThrows @SneakyThrows
public CompressedStringDataPoint(long ts, String value) { public CompressedStringDataPoint(long ts, byte[] compressedValue) {
super(ts); super(ts);
cnt.incrementAndGet(); this.compressedValue = compressedValue;
uncompressedLength.addAndGet(value.getBytes(StandardCharsets.UTF_8).length);
this.value = TbBytePool.intern(Snappy.compress(value));
compressedLength.addAndGet(this.value.length);
} }
@Override @Override
@ -52,13 +40,12 @@ public class CompressedStringDataPoint extends AbstractDataPoint {
@SneakyThrows @SneakyThrows
@Override @Override
public String getStr() { public String getStr() {
return Snappy.uncompressString(value); return Snappy.uncompressString(compressedValue);
} }
@SneakyThrows
@Override @Override
public String valueToString() { public String valueToString() {
return Snappy.uncompressString(value); return getStr();
} }
} }

View File

@ -1,144 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edqs.load;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.edqs.LatestTsKv;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.edqs.processor.EdqsConverter;
import org.thingsboard.server.edqs.repo.TenantRepo;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
@RequiredArgsConstructor
public class TenantRepoLoader {
private static final int DEVICE_COUNT = 100000;
private static final int ATTRS_PER_DEVICE = 30;
private static final int ATTRS_AVG_STR_LENGTH = 12;
private static final int ATTRS_AVG_JSON_LENGTH = 265;
private static final int TS_PER_DEVICE = 29;
private static final int TS_AVG_STR_LENGTH = 59;
private static final int TS_AVG_JSON_LENGTH = 4005;
private static final Map<DataType, Integer> ATTR_CHANCES = new HashMap<>();
private static final Random random = new Random();
static {
ATTR_CHANCES.put(DataType.BOOLEAN, 5);
ATTR_CHANCES.put(DataType.STRING, 49);
ATTR_CHANCES.put(DataType.LONG, 34);
ATTR_CHANCES.put(DataType.DOUBLE, 2);
ATTR_CHANCES.put(DataType.JSON, 10);
}
private static final Map<DataType, Integer> TS_CHANCES = new HashMap<>();
static {
TS_CHANCES.put(DataType.BOOLEAN, 6);
TS_CHANCES.put(DataType.STRING, 19);
TS_CHANCES.put(DataType.LONG, 36);
TS_CHANCES.put(DataType.DOUBLE, 32);
TS_CHANCES.put(DataType.JSON, 7);
}
@Getter
private final TenantRepo tenantRepo;
public void load() {
long ts = System.currentTimeMillis() - DEVICE_COUNT;
for (int i = 0; i < DEVICE_COUNT; i++) {
DeviceId deviceId = new DeviceId(UUID.randomUUID());
Device device = new Device();
device.setId(deviceId);
device.setCreatedTime(ts + i);
device.setName("Device " + i);
device.setLabel("Device Label" + i);
device.setType("Device Type " + (i % 100));
tenantRepo.addOrUpdate(EdqsConverter.toEntity(EntityType.DEVICE, device));
for (int j = 0; j < ATTRS_PER_DEVICE; j++) {
String key = getRandomKey();
AttributeKv attributeKv = new AttributeKv();
attributeKv.setEntityId(deviceId);
attributeKv.setScope(AttributeScope.SERVER_SCOPE);
attributeKv.setKey(key);
attributeKv.setLastUpdateTs(ts);
attributeKv.setValue(getRandomKvEntry(key, ATTR_CHANCES, ATTRS_AVG_STR_LENGTH, ATTRS_AVG_JSON_LENGTH));
tenantRepo.addOrUpdateAttribute(attributeKv);
}
for (int j = 0; j < TS_PER_DEVICE; j++) {
String key = getRandomKey();
LatestTsKv latestTsKv = new LatestTsKv();
latestTsKv.setEntityId(deviceId);
latestTsKv.setKey(key);
latestTsKv.setTs(ts);
latestTsKv.setValue(getRandomKvEntry(key, TS_CHANCES, TS_AVG_STR_LENGTH, TS_AVG_JSON_LENGTH));
tenantRepo.addOrUpdateLatestKv(latestTsKv);
}
}
}
private KvEntry getRandomKvEntry(String key, Map<DataType, Integer> chances, int strLength, int jsnLength) {
int i = random.nextInt(100);
int s = 0;
for (var pair : chances.entrySet()) {
s += pair.getValue();
if (i < s) {
switch (pair.getKey()) {
case BOOLEAN -> {
return new BooleanDataEntry(key, random.nextBoolean());
}
case LONG -> {
return new LongDataEntry(key, random.nextLong());
}
case DOUBLE -> {
return new DoubleDataEntry(key, random.nextDouble());
}
case STRING -> {
return new StringDataEntry(key, StringUtils.randomAlphanumeric(strLength));
}
case JSON -> {
return new JsonDataEntry(key, StringUtils.randomAlphanumeric(jsnLength));
}
}
}
}
throw new RuntimeException("Something went wrong");
}
private String getRandomKey() {
return RandomStringUtils.randomAlphabetic(10);
}
}

View File

@ -15,14 +15,17 @@
*/ */
package org.thingsboard.server.edqs.processor; package org.thingsboard.server.edqs.processor;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.AttributeKv; import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.edqs.DataPoint;
import org.thingsboard.server.common.data.edqs.EdqsObject; import org.thingsboard.server.common.data.edqs.EdqsObject;
import org.thingsboard.server.common.data.edqs.Entity; import org.thingsboard.server.common.data.edqs.Entity;
import org.thingsboard.server.common.data.edqs.LatestTsKv; import org.thingsboard.server.common.data.edqs.LatestTsKv;
@ -31,15 +34,25 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.edqs.data.dp.BoolDataPoint;
import org.thingsboard.server.edqs.data.dp.CompressedJsonDataPoint;
import org.thingsboard.server.edqs.data.dp.CompressedStringDataPoint;
import org.thingsboard.server.edqs.data.dp.DoubleDataPoint;
import org.thingsboard.server.edqs.data.dp.JsonDataPoint;
import org.thingsboard.server.edqs.data.dp.LongDataPoint;
import org.thingsboard.server.edqs.data.dp.StringDataPoint;
import org.thingsboard.server.edqs.repo.TbBytePool;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto;
import org.xerial.snappy.Snappy;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@Service @Service
@Slf4j
public class EdqsConverter { public class EdqsConverter {
private final Map<ObjectType, Converter<? extends EdqsObject>> converters = new HashMap<>(); private final Map<ObjectType, Converter<? extends EdqsObject>> converters = new HashMap<>();
@ -50,7 +63,6 @@ public class EdqsConverter {
converters.put(ObjectType.ATTRIBUTE_KV, new Converter<AttributeKv>() { converters.put(ObjectType.ATTRIBUTE_KV, new Converter<AttributeKv>() {
@Override @Override
public byte[] serialize(ObjectType type, AttributeKv attributeKv) { public byte[] serialize(ObjectType type, AttributeKv attributeKv) {
// TODO: some attributes may not fit into kafka
var proto = TransportProtos.AttributeKvProto.newBuilder() var proto = TransportProtos.AttributeKvProto.newBuilder()
.setEntityIdMSB(attributeKv.getEntityId().getId().getMostSignificantBits()) .setEntityIdMSB(attributeKv.getEntityId().getId().getMostSignificantBits())
.setEntityIdLSB(attributeKv.getEntityId().getId().getLeastSignificantBits()) .setEntityIdLSB(attributeKv.getEntityId().getId().getLeastSignificantBits())
@ -58,11 +70,8 @@ public class EdqsConverter {
.setScope(TransportProtos.AttributeScopeProto.forNumber(attributeKv.getScope().ordinal())) .setScope(TransportProtos.AttributeScopeProto.forNumber(attributeKv.getScope().ordinal()))
.setKey(attributeKv.getKey()) .setKey(attributeKv.getKey())
.setVersion(attributeKv.getVersion()); .setVersion(attributeKv.getVersion());
if (attributeKv.getLastUpdateTs() != null) { if (attributeKv.getLastUpdateTs() != null && attributeKv.getValue() != null) {
proto.setLastUpdateTs(attributeKv.getLastUpdateTs()); proto.setDataPoint(toDataPointProto(attributeKv.getLastUpdateTs(), attributeKv.getValue()));
}
if (attributeKv.getValue() != null) {
proto.setValue(KvProtoUtil.toKeyValueTypeProto(attributeKv.getValue()));
} }
return proto.build().toByteArray(); return proto.build().toByteArray();
} }
@ -73,14 +82,13 @@ public class EdqsConverter {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()), EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()),
new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
AttributeScope scope = AttributeScope.values()[proto.getScope().getNumber()]; AttributeScope scope = AttributeScope.values()[proto.getScope().getNumber()];
KvEntry value = proto.hasValue() ? KvProtoUtil.fromTsKvProto(proto.getValue()) : null; DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null;
return AttributeKv.builder() return AttributeKv.builder()
.entityId(entityId) .entityId(entityId)
.scope(scope) .scope(scope)
.key(proto.getKey()) .key(proto.getKey())
.version(proto.getVersion()) .version(proto.getVersion())
.lastUpdateTs(proto.getLastUpdateTs()) .dataPoint(dataPoint)
.value(value)
.build(); .build();
} }
}); });
@ -93,11 +101,8 @@ public class EdqsConverter {
.setEntityType(ProtoUtils.toProto(latestTsKv.getEntityId().getEntityType())) .setEntityType(ProtoUtils.toProto(latestTsKv.getEntityId().getEntityType()))
.setKey(latestTsKv.getKey()) .setKey(latestTsKv.getKey())
.setVersion(latestTsKv.getVersion()); .setVersion(latestTsKv.getVersion());
if (latestTsKv.getTs() != null) { if (latestTsKv.getTs() != null && latestTsKv.getValue() != null) {
proto.setTs(latestTsKv.getTs()); proto.setDataPoint(toDataPointProto(latestTsKv.getTs(), latestTsKv.getValue()));
}
if (latestTsKv.getValue() != null) {
proto.setValue(KvProtoUtil.toKeyValueTypeProto(latestTsKv.getValue()));
} }
return proto.build().toByteArray(); return proto.build().toByteArray();
} }
@ -107,18 +112,73 @@ public class EdqsConverter {
TransportProtos.LatestTsKvProto proto = TransportProtos.LatestTsKvProto.parseFrom(bytes); TransportProtos.LatestTsKvProto proto = TransportProtos.LatestTsKvProto.parseFrom(bytes);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()), EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()),
new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
KvEntry value = proto.hasValue() ? KvProtoUtil.fromTsKvProto(proto.getValue()) : null; DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null;
return LatestTsKv.builder() return LatestTsKv.builder()
.entityId(entityId) .entityId(entityId)
.key(proto.getKey()) .key(proto.getKey())
.ts(proto.getTs())
.version(proto.getVersion()) .version(proto.getVersion())
.value(value) .dataPoint(dataPoint)
.build(); .build();
} }
}); });
} }
public static DataPointProto toDataPointProto(long ts, KvEntry kvEntry) {
DataPointProto.Builder proto = DataPointProto.newBuilder();
proto.setTs(ts);
switch (kvEntry.getDataType()) {
case BOOLEAN -> proto.setBoolV(kvEntry.getBooleanValue().get());
case LONG -> proto.setLongV(kvEntry.getLongValue().get());
case DOUBLE -> proto.setDoubleV(kvEntry.getDoubleValue().get());
case STRING -> {
String strValue = kvEntry.getStrValue().get();
if (strValue.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) {
proto.setStringV(strValue);
} else {
proto.setCompressedStringV(ByteString.copyFrom(compress(strValue)));
}
}
case JSON -> {
String jsonValue = kvEntry.getJsonValue().get();
if (jsonValue.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) {
proto.setJsonV(jsonValue);
} else {
proto.setCompressedJsonV(ByteString.copyFrom(compress(jsonValue)));
}
}
}
return proto.build();
}
public static DataPoint fromDataPointProto(DataPointProto proto) {
long ts = proto.getTs();
if (proto.hasBoolV()) {
return new BoolDataPoint(ts, proto.getBoolV());
} else if (proto.hasLongV()) {
return new LongDataPoint(ts, proto.getLongV());
} else if (proto.hasDoubleV()) {
return new DoubleDataPoint(ts, proto.getDoubleV());
} else if (proto.hasStringV()) {
return new StringDataPoint(ts, proto.getStringV());
} else if (proto.hasCompressedStringV()) {
return new CompressedStringDataPoint(ts, TbBytePool.intern(proto.getCompressedStringV().toByteArray()));
} else if (proto.hasJsonV()) {
return new JsonDataPoint(ts, proto.getJsonV());
} else if (proto.hasCompressedJsonV()) {
return new CompressedJsonDataPoint(ts, TbBytePool.intern(proto.getCompressedJsonV().toByteArray()));
} else {
throw new IllegalArgumentException("Unsupported data point proto: " + proto);
}
}
@SneakyThrows
private static byte[] compress(String value) {
byte[] compressed = Snappy.compress(value);
// TODO: limit the size
log.debug("Compressed {} bytes to {} bytes", value.length(), compressed.length);
return compressed;
}
public static Entity toEntity(EntityType entityType, Object entity) { public static Entity toEntity(EntityType entityType, Object entity) {
Entity edqsEntity = new Entity(); Entity edqsEntity = new Entity();
edqsEntity.setType(entityType); edqsEntity.setType(entityType);

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.edqs.processor;
import lombok.Builder; import lombok.Builder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -58,6 +59,12 @@ public class EdqsProducer {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
if (t instanceof RecordTooLargeException) {
if (!log.isDebugEnabled()) {
log.warn("[{}][{}][{}] Failed to publish msg to {}", tenantId, type, key, topic, t); // not logging the whole message
return;
}
}
log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t); log.warn("[{}][{}][{}] Failed to publish msg to {}: {}", tenantId, type, key, topic, msg, t);
} }
}; };

View File

@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.edqs.AttributeKv; import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.edqs.DataPoint;
import org.thingsboard.server.common.data.edqs.EdqsEvent; import org.thingsboard.server.common.data.edqs.EdqsEvent;
import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.edqs.EdqsEventType;
import org.thingsboard.server.common.data.edqs.EdqsObject; import org.thingsboard.server.common.data.edqs.EdqsObject;
@ -30,7 +31,6 @@ import org.thingsboard.server.common.data.edqs.query.QueryResult;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.permission.QueryContext; import org.thingsboard.server.common.data.permission.QueryContext;
import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityCountQuery;
@ -50,14 +50,6 @@ import org.thingsboard.server.edqs.data.EntityProfileData;
import org.thingsboard.server.edqs.data.GenericData; import org.thingsboard.server.edqs.data.GenericData;
import org.thingsboard.server.edqs.data.RelationsRepo; import org.thingsboard.server.edqs.data.RelationsRepo;
import org.thingsboard.server.edqs.data.TenantData; import org.thingsboard.server.edqs.data.TenantData;
import org.thingsboard.server.edqs.data.dp.BoolDataPoint;
import org.thingsboard.server.edqs.data.dp.CompressedJsonDataPoint;
import org.thingsboard.server.edqs.data.dp.CompressedStringDataPoint;
import org.thingsboard.server.edqs.data.dp.DataPoint;
import org.thingsboard.server.edqs.data.dp.DoubleDataPoint;
import org.thingsboard.server.edqs.data.dp.JsonDataPoint;
import org.thingsboard.server.edqs.data.dp.LongDataPoint;
import org.thingsboard.server.edqs.data.dp.StringDataPoint;
import org.thingsboard.server.edqs.query.EdqsDataQuery; import org.thingsboard.server.edqs.query.EdqsDataQuery;
import org.thingsboard.server.edqs.query.EdqsQuery; import org.thingsboard.server.edqs.query.EdqsQuery;
import org.thingsboard.server.edqs.query.SortableEntityData; import org.thingsboard.server.edqs.query.SortableEntityData;
@ -240,9 +232,8 @@ public class TenantRepo {
public void addOrUpdateAttribute(AttributeKv attributeKv) { public void addOrUpdateAttribute(AttributeKv attributeKv) {
var entityData = getOrCreate(attributeKv.getEntityId()); var entityData = getOrCreate(attributeKv.getEntityId());
if (entityData != null) { if (entityData != null) {
KvEntry value = attributeKv.getValue();
Integer keyId = KeyDictionary.get(attributeKv.getKey()); Integer keyId = KeyDictionary.get(attributeKv.getKey());
boolean added = entityData.putAttr(keyId, attributeKv.getScope(), toDataPoint(attributeKv.getLastUpdateTs(), value)); boolean added = entityData.putAttr(keyId, attributeKv.getScope(), attributeKv.getDataPoint());
if (added) { if (added) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.UPDATED)); edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.ATTRIBUTE_KV, EdqsEventType.UPDATED));
} }
@ -262,9 +253,8 @@ public class TenantRepo {
public void addOrUpdateLatestKv(LatestTsKv latestTsKv) { public void addOrUpdateLatestKv(LatestTsKv latestTsKv) {
var entityData = getOrCreate(latestTsKv.getEntityId()); var entityData = getOrCreate(latestTsKv.getEntityId());
if (entityData != null) { if (entityData != null) {
KvEntry value = latestTsKv.getValue();
Integer keyId = KeyDictionary.get(latestTsKv.getKey()); Integer keyId = KeyDictionary.get(latestTsKv.getKey());
boolean added = entityData.putTs(keyId, toDataPoint(latestTsKv.getTs(), value)); boolean added = entityData.putTs(keyId, latestTsKv.getDataPoint());
if (added) { if (added) {
edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.UPDATED)); edqsStatsService.ifPresent(statService -> statService.reportEvent(tenantId, ObjectType.LATEST_TS_KV, EdqsEventType.UPDATED));
} }
@ -281,42 +271,12 @@ public class TenantRepo {
} }
} }
private DataPoint toDataPoint(long ts, KvEntry kvEntry) {
return switch (kvEntry.getDataType()) {
case BOOLEAN -> new BoolDataPoint(ts, kvEntry.getBooleanValue().get());
case STRING -> getStrDataPoint(ts, kvEntry.getStrValue().get());
case LONG -> new LongDataPoint(ts, kvEntry.getLongValue().get());
case DOUBLE -> new DoubleDataPoint(ts, kvEntry.getDoubleValue().get());
case JSON -> getJsonDataPoint(ts, kvEntry.getJsonValue().get());
};
}
public void processFields(EntityFields fields) { public void processFields(EntityFields fields) {
if (fields instanceof AssetFields assetFields) { if (fields instanceof AssetFields assetFields) {
assetFields.setType(TbStringPool.intern(assetFields.getType())); assetFields.setType(TbStringPool.intern(assetFields.getType()));
} }
} }
private static DataPoint getStrDataPoint(long ts, String strV) {
DataPoint dp;
if (strV.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) {
dp = new StringDataPoint(ts, strV);
} else {
dp = new CompressedStringDataPoint(ts, strV);
}
return dp;
}
private static DataPoint getJsonDataPoint(long ts, String strV) {
DataPoint dp;
if (strV.length() < CompressedStringDataPoint.MIN_STR_SIZE_TO_COMPRESS) {
dp = new JsonDataPoint(ts, strV);
} else {
dp = new CompressedJsonDataPoint(ts, strV);
}
return dp;
}
public ConcurrentMap<UUID, EntityData<?>> getEntityMap(EntityType entityType) { public ConcurrentMap<UUID, EntityData<?>> getEntityMap(EntityType entityType) {
return entityMapByType.computeIfAbsent(entityType, et -> new ConcurrentHashMap<>()); return entityMapByType.computeIfAbsent(entityType, et -> new ConcurrentHashMap<>());
} }

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.edqs.util;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edqs.DataPoint;
import org.thingsboard.server.common.data.permission.QueryContext; import org.thingsboard.server.common.data.permission.QueryContext;
import org.thingsboard.server.common.data.query.BooleanFilterPredicate; import org.thingsboard.server.common.data.query.BooleanFilterPredicate;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate; import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
@ -41,7 +42,6 @@ import org.thingsboard.server.common.data.query.StringFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.edqs.data.EntityData; import org.thingsboard.server.edqs.data.EntityData;
import org.thingsboard.server.edqs.data.dp.DataPoint;
import org.thingsboard.server.edqs.query.DataKey; import org.thingsboard.server.edqs.query.DataKey;
import org.thingsboard.server.edqs.query.EdqsCountQuery; import org.thingsboard.server.edqs.query.EdqsCountQuery;
import org.thingsboard.server.edqs.query.EdqsDataQuery; import org.thingsboard.server.edqs.query.EdqsDataQuery;

View File

@ -178,8 +178,7 @@ message AttributeKvProto {
AttributeScopeProto scope = 4; AttributeScopeProto scope = 4;
string key = 5; string key = 5;
int64 version = 6; int64 version = 6;
int64 lastUpdateTs = 7; DataPointProto dataPoint = 7;
KeyValueProto value = 8;
} }
message TsKvProto { message TsKvProto {
@ -193,9 +192,8 @@ message LatestTsKvProto {
int64 entityIdLSB = 2; int64 entityIdLSB = 2;
EntityTypeProto entityType = 3; EntityTypeProto entityType = 3;
string key = 4; string key = 4;
int64 ts = 5; int64 version = 5;
int64 version = 6; DataPointProto dataPoint = 6;
KeyValueProto value = 7;
} }
message TsKvListProto { message TsKvListProto {
@ -203,6 +201,17 @@ message TsKvListProto {
repeated KeyValueProto kv = 2; repeated KeyValueProto kv = 2;
} }
message DataPointProto {
int64 ts = 1;
optional bool boolV = 2;
optional int64 longV = 3;
optional double doubleV = 4;
optional string stringV = 5;
optional bytes compressedStringV = 6;
optional string jsonV = 7;
optional bytes compressedJsonV = 8;
}
message DeviceInfoProto { message DeviceInfoProto {
int64 tenantIdMSB = 1; int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2; int64 tenantIdLSB = 2;