From d6367c9680188b213826e3dcf252776ae90f0131 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 9 Jul 2024 15:19:01 +0200 Subject: [PATCH 1/6] added version to timeseries and attribute protos --- .../cache/VersionedCaffeineTbCache.java | 7 +- .../server/cache/VersionedRedisTbCache.java | 18 +-- .../server/cache/VersionedTbCache.java | 2 - .../common/data/kv/BaseAttributeKvEntry.java | 39 +----- .../server/common/data/kv/BasicTsKvEntry.java | 34 +----- .../server/common/util/KvProtoUtil.java | 9 +- .../server/common/util/ProtoUtils.java | 94 ++++++++------- common/proto/src/main/proto/queue.proto | 2 + .../dao/attributes/AttributeRedisCache.java | 54 +-------- .../attributes/CachedAttributesService.java | 22 ++-- .../CachedRedisSqlTimeseriesLatestDao.java | 12 +- .../dao/timeseries/TsLatestRedisCache.java | 2 +- .../attributes/BaseAttributesServiceTest.java | 23 ++-- .../sql/AttributeCacheServiceSqlTest.java | 114 ++++++++++++++++++ .../timeseries/BaseTimeseriesServiceTest.java | 22 +++- 15 files changed, 253 insertions(+), 201 deletions(-) create mode 100644 dao/src/test/java/org/thingsboard/server/dao/service/attributes/sql/AttributeCacheServiceSqlTest.java diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java index f2ac3c36d2..8ed4928d97 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedCaffeineTbCache.java @@ -45,11 +45,10 @@ public abstract class VersionedCaffeineTbCacheI8", newVersion) .. newValue redis.call('SET', key, newValueWithVersion, 'EX', expiration) end - + local function bytes_to_number(bytes) local n = 0 for i = 1, 8 do @@ -96,13 +96,15 @@ public abstract class VersionedRedisTbCache>> 32)); - result = 31 * result + kv.hashCode(); - return result; - } - - @Override - public String toString() { - return "BaseAttributeKvEntry{" + - "lastUpdateTs=" + lastUpdateTs + - ", kv=" + kv + - '}'; - } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java index ab39498ae0..82f96ea966 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BasicTsKvEntry.java @@ -16,9 +16,12 @@ package org.thingsboard.server.common.data.kv; import jakarta.validation.Valid; +import lombok.Data; + import java.util.Objects; import java.util.Optional; +@Data public class BasicTsKvEntry implements TsKvEntry { private static final int MAX_CHARS_PER_DATA_POINT = 512; protected final long ts; @@ -79,33 +82,6 @@ public class BasicTsKvEntry implements TsKvEntry { return kv.getValue(); } - @Override - public long getTs() { - return ts; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof BasicTsKvEntry)) return false; - BasicTsKvEntry that = (BasicTsKvEntry) o; - return getTs() == that.getTs() && - Objects.equals(kv, that.kv); - } - - @Override - public int hashCode() { - return Objects.hash(getTs(), kv); - } - - @Override - public String toString() { - return "BasicTsKvEntry{" + - "ts=" + ts + - ", kv=" + kv + - '}'; - } - @Override public String getValueAsString() { return kv.getValueAsString(); @@ -127,8 +103,4 @@ public class BasicTsKvEntry implements TsKvEntry { return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT); } - @Override - public Long getVersion() { - return version; - } } diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java b/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java index 74674e1e45..4b206fd804 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java @@ -86,8 +86,15 @@ public class KvProtoUtil { .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build(); } + public static TransportProtos.TsKvProto toTsKvProto(long ts, KvEntry kvEntry, long version) { + return TransportProtos.TsKvProto.newBuilder() + .setTs(ts) + .setVersion(version) + .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build(); + } + public static TsKvEntry fromTsKvProto(TransportProtos.TsKvProto proto) { - return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv())); + return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv()), proto.getVersion()); } public static TransportProtos.KeyValueProto toKeyValueTypeProto(KvEntry kvEntry) { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index e3d587e915..db1c1fe2e5 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -17,7 +17,9 @@ package org.thingsboard.server.common.util; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.Nullable; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; @@ -256,42 +258,47 @@ public class ProtoUtils { if (msg.getValues() != null) { for (AttributeKvEntry attributeKvEntry : msg.getValues()) { - TransportProtos.AttributeValueProto.Builder attributeValueBuilder = TransportProtos.AttributeValueProto.newBuilder() - .setLastUpdateTs(attributeKvEntry.getLastUpdateTs()) - .setKey(attributeKvEntry.getKey()); - switch (attributeKvEntry.getDataType()) { - case BOOLEAN -> { - attributeKvEntry.getBooleanValue().ifPresent(attributeValueBuilder::setBoolV); - attributeValueBuilder.setHasV(attributeKvEntry.getBooleanValue().isPresent()); - attributeValueBuilder.setType(TransportProtos.KeyValueType.BOOLEAN_V); - } - case STRING -> { - attributeKvEntry.getStrValue().ifPresent(attributeValueBuilder::setStringV); - attributeValueBuilder.setHasV(attributeKvEntry.getStrValue().isPresent()); - attributeValueBuilder.setType(TransportProtos.KeyValueType.STRING_V); - } - case DOUBLE -> { - attributeKvEntry.getDoubleValue().ifPresent(attributeValueBuilder::setDoubleV); - attributeValueBuilder.setHasV(attributeKvEntry.getDoubleValue().isPresent()); - attributeValueBuilder.setType(TransportProtos.KeyValueType.DOUBLE_V); - } - case LONG -> { - attributeKvEntry.getLongValue().ifPresent(attributeValueBuilder::setLongV); - attributeValueBuilder.setHasV(attributeKvEntry.getLongValue().isPresent()); - attributeValueBuilder.setType(TransportProtos.KeyValueType.LONG_V); - } - case JSON -> { - attributeKvEntry.getJsonValue().ifPresent(attributeValueBuilder::setJsonV); - attributeValueBuilder.setHasV(attributeKvEntry.getJsonValue().isPresent()); - attributeValueBuilder.setType(TransportProtos.KeyValueType.JSON_V); - } - } - builder.addValues(attributeValueBuilder.build()); + builder.addValues(toProto(attributeKvEntry)); } } return builder.build(); } + public static TransportProtos.AttributeValueProto toProto(AttributeKvEntry attributeKvEntry) { + TransportProtos.AttributeValueProto.Builder builder = TransportProtos.AttributeValueProto.newBuilder() + .setLastUpdateTs(attributeKvEntry.getLastUpdateTs()) + .setKey(attributeKvEntry.getKey()); + switch (attributeKvEntry.getDataType()) { + case BOOLEAN: + attributeKvEntry.getBooleanValue().ifPresent(builder::setBoolV); + builder.setHasV(attributeKvEntry.getBooleanValue().isPresent()); + builder.setType(TransportProtos.KeyValueType.BOOLEAN_V); + break; + case STRING: + attributeKvEntry.getStrValue().ifPresent(builder::setStringV); + builder.setHasV(attributeKvEntry.getStrValue().isPresent()); + builder.setType(TransportProtos.KeyValueType.STRING_V); + break; + case DOUBLE: + attributeKvEntry.getDoubleValue().ifPresent(builder::setDoubleV); + builder.setHasV(attributeKvEntry.getDoubleValue().isPresent()); + builder.setType(TransportProtos.KeyValueType.DOUBLE_V); + break; + case LONG: + attributeKvEntry.getLongValue().ifPresent(builder::setLongV); + builder.setHasV(attributeKvEntry.getLongValue().isPresent()); + builder.setType(TransportProtos.KeyValueType.LONG_V); + break; + case JSON: + attributeKvEntry.getJsonValue().ifPresent(builder::setJsonV); + builder.setHasV(attributeKvEntry.getJsonValue().isPresent()); + builder.setType(TransportProtos.KeyValueType.JSON_V); + break; + } + builder.setVersion(attributeKvEntry.getVersion()); + return builder.build(); + } + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) { return new DeviceAttributesEventNotificationMsg( TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), @@ -500,20 +507,25 @@ public class ProtoUtils { } List result = new ArrayList<>(); for (TransportProtos.AttributeValueProto kvEntry : valuesList) { - boolean hasValue = kvEntry.getHasV(); - KvEntry entry = switch (kvEntry.getType()) { - case BOOLEAN_V -> new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null); - case LONG_V -> new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null); - case DOUBLE_V -> new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null); - case STRING_V -> new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null); - case JSON_V -> new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null); - default -> null; - }; - result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry)); + result.add(fromProto(kvEntry)); } return result; } + public static AttributeKvEntry fromProto(TransportProtos.AttributeValueProto proto) { + boolean hasValue = proto.getHasV(); + String key = proto.getKey(); + KvEntry entry = switch (proto.getType()) { + case BOOLEAN_V -> new BooleanDataEntry(key, hasValue ? proto.getBoolV() : null); + case LONG_V -> new LongDataEntry(key, hasValue ? proto.getLongV() : null); + case DOUBLE_V -> new DoubleDataEntry(key, hasValue ? proto.getDoubleV() : null); + case STRING_V -> new StringDataEntry(key, hasValue ? proto.getStringV() : null); + case JSON_V -> new JsonDataEntry(key, hasValue ? proto.getJsonV() : null); + default -> null; + }; + return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.getVersion()); + } + public static TransportProtos.DeviceProto toProto(Device device) { var builder = TransportProtos.DeviceProto.newBuilder() .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 581b993120..9b93812708 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -157,11 +157,13 @@ message AttributeValueProto { string string_v = 7; string json_v = 8; optional string key = 9; + int64 version = 10; } message TsKvProto { int64 ts = 1; KeyValueProto kv = 2; + int64 version = 3; } message TsKvListProto { diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java index a6f21f9c3a..2b182c2b92 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributeRedisCache.java @@ -26,15 +26,8 @@ import org.thingsboard.server.cache.TbRedisSerializer; import org.thingsboard.server.cache.VersionedRedisTbCache; import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.kv.BooleanDataEntry; -import org.thingsboard.server.common.data.kv.DoubleDataEntry; -import org.thingsboard.server.common.data.kv.JsonDataEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; -import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; -import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @Service("AttributeCache") @@ -44,54 +37,13 @@ public class AttributeRedisCache extends VersionedRedisTbCache() { @Override public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException { - AttributeValueProto.Builder builder = AttributeValueProto.newBuilder() - .setLastUpdateTs(attributeKvEntry.getLastUpdateTs()); - switch (attributeKvEntry.getDataType()) { - case BOOLEAN: - attributeKvEntry.getBooleanValue().ifPresent(builder::setBoolV); - builder.setHasV(attributeKvEntry.getBooleanValue().isPresent()); - builder.setType(KeyValueType.BOOLEAN_V); - break; - case STRING: - attributeKvEntry.getStrValue().ifPresent(builder::setStringV); - builder.setHasV(attributeKvEntry.getStrValue().isPresent()); - builder.setType(KeyValueType.STRING_V); - break; - case DOUBLE: - attributeKvEntry.getDoubleValue().ifPresent(builder::setDoubleV); - builder.setHasV(attributeKvEntry.getDoubleValue().isPresent()); - builder.setType(KeyValueType.DOUBLE_V); - break; - case LONG: - attributeKvEntry.getLongValue().ifPresent(builder::setLongV); - builder.setHasV(attributeKvEntry.getLongValue().isPresent()); - builder.setType(KeyValueType.LONG_V); - break; - case JSON: - attributeKvEntry.getJsonValue().ifPresent(builder::setJsonV); - builder.setHasV(attributeKvEntry.getJsonValue().isPresent()); - builder.setType(KeyValueType.JSON_V); - break; - - } - return builder.build().toByteArray(); + return ProtoUtils.toProto(attributeKvEntry).toByteArray(); } @Override public AttributeKvEntry deserialize(AttributeCacheKey key, byte[] bytes) throws SerializationException { try { - AttributeValueProto proto = AttributeValueProto.parseFrom(bytes); - boolean hasValue = proto.getHasV(); - KvEntry entry = switch (proto.getType()) { - case BOOLEAN_V -> new BooleanDataEntry(key.getKey(), hasValue ? proto.getBoolV() : null); - case LONG_V -> new LongDataEntry(key.getKey(), hasValue ? proto.getLongV() : null); - case DOUBLE_V -> new DoubleDataEntry(key.getKey(), hasValue ? proto.getDoubleV() : null); - case STRING_V -> new StringDataEntry(key.getKey(), hasValue ? proto.getStringV() : null); - case JSON_V -> new JsonDataEntry(key.getKey(), hasValue ? proto.getJsonV() : null); - default -> - throw new InvalidProtocolBufferException("Unrecognized type: " + proto.getType() + " !"); - }; - return new BaseAttributeKvEntry(proto.getLastUpdateTs(), entry); + return ProtoUtils.fromProto(AttributeValueProto.parseFrom(bytes)); } catch (InvalidProtocolBufferException e) { throw new SerializationException(e.getMessage()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index d31ff9db79..3de90355ed 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; @@ -159,7 +160,7 @@ public class CachedAttributesService implements AttributesService { log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); List result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); for (AttributeKvEntry foundInDbAttribute : result) { - put(entityId, scope, foundInDbAttribute, foundInDbAttribute.getVersion()); + put(entityId, scope, foundInDbAttribute); notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); } for (String key : notFoundAttributeKeys) { @@ -218,8 +219,7 @@ public class CachedAttributesService implements AttributesService { public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); AttributeUtils.validate(attribute, valueNoXssValidation); - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - return Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor); + return doSave(tenantId, entityId, scope, attribute); } @Override @@ -234,19 +234,25 @@ public class CachedAttributesService implements AttributesService { List> futures = new ArrayList<>(attributes.size()); for (var attribute : attributes) { - ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); - futures.add(Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor)); + futures.add(doSave(tenantId, entityId, scope, attribute)); } return Futures.allAsList(futures); } - private Long put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) { + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { + ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); + return Futures.transform(future, version -> { + put(entityId, scope, new BaseAttributeKvEntry(((BaseAttributeKvEntry)attribute).getKv(), attribute.getLastUpdateTs(), version)); + return version; + }, cacheExecutor); + } + + private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { String key = attribute.getKey(); log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute); - cache.put(new AttributeCacheKey(scope, entityId, key), attribute, version); + cache.put(new AttributeCacheKey(scope, entityId, key), attribute); log.trace("[{}][{}][{}] after cache put.", entityId, scope, key); - return version; } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index 42dc149342..0be078a89f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -29,6 +29,7 @@ import org.thingsboard.server.cache.VersionedTbCache; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; @@ -66,9 +67,9 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries @Override public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { ListenableFuture future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); - future = Futures.transform(future, x -> { - cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry, x); - return x; + future = Futures.transform(future, version -> { + cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), new BasicTsKvEntry(tsKvEntry.getTs(), ((BasicTsKvEntry) tsKvEntry).getKv(), version)); + return version; }, cacheExecutorService); if (log.isTraceEnabled()) { @@ -94,8 +95,9 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries if (x.isRemoved()) { TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey()); Long version = x.getVersion(); - if (x.getData() != null) { - cache.put(key, x.getData(), version); + TsKvEntry newTsKvEntry = x.getData(); + if (newTsKvEntry != null) { + cache.put(key, new BasicTsKvEntry(newTsKvEntry.getTs(), ((BasicTsKvEntry) newTsKvEntry).getKv(), version)); } else { cache.evict(key, version); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java index 851d66ae0c..241132b4c8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsLatestRedisCache.java @@ -39,7 +39,7 @@ public class TsLatestRedisCache extends VersionedRedisTbCache() { @Override public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException { - return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry).toByteArray(); + return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry, tsKvEntry.getVersion()).toByteArray(); } @Override diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java index 0603f4e3f6..2d648f9db3 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java @@ -26,7 +26,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.thingsboard.server.cache.VersionedTbCache; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -34,7 +33,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; -import org.thingsboard.server.dao.attributes.AttributeCacheKey; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.service.AbstractServiceTest; @@ -74,7 +72,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attr)).get(); Optional saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attr.getKey()).get(); Assert.assertTrue(saved.isPresent()); - Assert.assertEquals(attr, saved.get()); + equalsIgnoreVersion(attr, saved.get()); } @Test @@ -87,14 +85,15 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { Optional saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get(); Assert.assertTrue(saved.isPresent()); - Assert.assertEquals(attrOld, saved.get()); + equalsIgnoreVersion(attrOld, saved.get()); KvEntry attrNewValue = new StringDataEntry("attribute1", "value2"); AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L); attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attrNew)).get(); saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get(); - Assert.assertEquals(attrNew, saved.get()); + Assert.assertTrue(saved.isPresent()); + equalsIgnoreVersion(attrNew, saved.get()); } @Test @@ -117,8 +116,8 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { Assert.assertNotNull(saved); Assert.assertEquals(2, saved.size()); - Assert.assertEquals(attrANew, saved.get(0)); - Assert.assertEquals(attrBNew, saved.get(1)); + equalsIgnoreVersion(attrANew, saved.get(0)); + equalsIgnoreVersion(attrBNew, saved.get(1)); } @Test @@ -253,6 +252,11 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { })); futures.add(pool.submit(() -> saveAttribute(tenantId, deviceId, scope, key, NEW_VALUE))); Futures.allAsList(futures).get(10, TimeUnit.SECONDS); + + String attributeValue = getAttributeValue(tenantId, deviceId, scope, key); + if (!NEW_VALUE.equals(attributeValue)) { + System.out.println(); + } Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key)); } @@ -309,5 +313,10 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { } } + private void equalsIgnoreVersion(AttributeKvEntry expected, AttributeKvEntry actual) { + Assert.assertEquals(expected.getKey(), actual.getKey()); + Assert.assertEquals(expected.getValue(), actual.getValue()); + Assert.assertEquals(expected.getLastUpdateTs(), actual.getLastUpdateTs()); + } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/sql/AttributeCacheServiceSqlTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/sql/AttributeCacheServiceSqlTest.java new file mode 100644 index 0000000000..95e3e15adc --- /dev/null +++ b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/sql/AttributeCacheServiceSqlTest.java @@ -0,0 +1,114 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.service.attributes.sql; + +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.cache.TbCacheValueWrapper; +import org.thingsboard.server.cache.VersionedTbCache; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.dao.attributes.AttributeCacheKey; +import org.thingsboard.server.dao.service.AbstractServiceTest; +import org.thingsboard.server.dao.service.DaoSqlTest; + +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@DaoSqlTest +public class AttributeCacheServiceSqlTest extends AbstractServiceTest { + + private static final String TEST_KEY = "key"; + private static final String TEST_VALUE = "value"; + private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID()); + + @Autowired + VersionedTbCache cache; + + @Test + public void testPutAndGet() { + AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY); + AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L); + cache.put(testKey, testValue); + + TbCacheValueWrapper wrapper = cache.get(testKey); + assertNotNull(wrapper); + + assertEquals(testValue, wrapper.get()); + + AttributeKvEntry testValue2 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 2L); + cache.put(testKey, testValue2); + + wrapper = cache.get(testKey); + assertNotNull(wrapper); + + assertEquals(testValue2, wrapper.get()); + + AttributeKvEntry testValue3 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 0L); + cache.put(testKey, testValue3); + + wrapper = cache.get(testKey); + assertNotNull(wrapper); + + assertEquals(testValue2, wrapper.get()); + + cache.evict(testKey); + } + + @Test + public void testEvictWithVersion() { + AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY); + AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L); + cache.put(testKey, testValue); + + TbCacheValueWrapper wrapper = cache.get(testKey); + assertNotNull(wrapper); + + assertEquals(testValue, wrapper.get()); + + cache.evict(testKey, 2L); + + wrapper = cache.get(testKey); + assertNotNull(wrapper); + + assertNull(wrapper.get()); + + cache.evict(testKey); + } + + @Test + public void testEvict() { + AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY); + AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L); + cache.put(testKey, testValue); + + TbCacheValueWrapper wrapper = cache.get(testKey); + assertNotNull(wrapper); + + assertEquals(testValue, wrapper.get()); + + cache.evict(testKey); + + wrapper = cache.get(testKey); + assertNull(wrapper); + } +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index e4cc64d47c..443052240a 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -137,7 +137,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { toTsEntry(TS, booleanKvEntry)); Collections.sort(expected, Comparator.comparing(KvEntry::getKey)); - assertEquals(expected, tsList); + for (int i = 0; i < expected.size(); i++) { + var expectedEntry = expected.get(i); + var actualEntry = tsList.get(i); + equalsIgnoreVersion(expectedEntry, actualEntry); + + } } private EntityView saveAndCreateEntityView(DeviceId deviceId, List timeseries) { @@ -160,7 +165,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); Assert.assertEquals(1, entries.size()); - Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); + equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entries.get(0)); } @Test @@ -176,7 +181,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); assertThat(entryOpt).isNotNull().isPresent(); - Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.orElse(null)); + equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entryOpt.get()); } @Test @@ -186,7 +191,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); assertThat(entryOpt).isNotNull().isPresent(); - Assert.assertEquals(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.orElse(null)); + equalsIgnoreVersion(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.get()); } public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception { @@ -209,7 +214,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Optional entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); assertThat(entryOpt).isNotNull().isPresent(); - Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.get()); + equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entryOpt.get()); } @Test @@ -239,7 +244,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { List entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); Assert.assertEquals(1, entries.size()); - Assert.assertEquals(toTsEntry(TS - 1, stringKvEntry), entries.get(0)); + equalsIgnoreVersion(toTsEntry(TS - 1, stringKvEntry), entries.get(0)); } @Test @@ -794,5 +799,10 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { return new BasicTsKvEntry(ts, entry); } + private static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) { + assertEquals(expected.getKey(), actual.getKey()); + assertEquals(expected.getValue(), actual.getValue()); + assertEquals(expected.getTs(), actual.getTs()); + } } From 2336cba89a61b5632742c80a5fc7d99774995952 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 9 Jul 2024 16:50:07 +0200 Subject: [PATCH 2/6] version in proto should be optional --- .../server/common/util/KvProtoUtil.java | 15 ++++++++++----- .../server/common/util/ProtoUtils.java | 8 ++++++-- common/proto/src/main/proto/queue.proto | 4 ++-- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java b/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java index 4b206fd804..1be79a28cc 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/KvProtoUtil.java @@ -86,15 +86,20 @@ public class KvProtoUtil { .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build(); } - public static TransportProtos.TsKvProto toTsKvProto(long ts, KvEntry kvEntry, long version) { - return TransportProtos.TsKvProto.newBuilder() + public static TransportProtos.TsKvProto toTsKvProto(long ts, KvEntry kvEntry, Long version) { + var builder = TransportProtos.TsKvProto.newBuilder() .setTs(ts) - .setVersion(version) - .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build(); + .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)); + + if (version != null) { + builder.setVersion(version); + } + + return builder.build(); } public static TsKvEntry fromTsKvProto(TransportProtos.TsKvProto proto) { - return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv()), proto.getVersion()); + return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv()), proto.hasVersion() ? proto.getVersion() : null); } public static TransportProtos.KeyValueProto toKeyValueTypeProto(KvEntry kvEntry) { diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index db1c1fe2e5..b9377ac215 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -295,7 +295,11 @@ public class ProtoUtils { builder.setType(TransportProtos.KeyValueType.JSON_V); break; } - builder.setVersion(attributeKvEntry.getVersion()); + + if (attributeKvEntry.getVersion() != null) { + builder.setVersion(attributeKvEntry.getVersion()); + } + return builder.build(); } @@ -523,7 +527,7 @@ public class ProtoUtils { case JSON_V -> new JsonDataEntry(key, hasValue ? proto.getJsonV() : null); default -> null; }; - return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.getVersion()); + return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.hasVersion() ? proto.getVersion() : null); } public static TransportProtos.DeviceProto toProto(Device device) { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 9b93812708..3317391f01 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -157,13 +157,13 @@ message AttributeValueProto { string string_v = 7; string json_v = 8; optional string key = 9; - int64 version = 10; + optional int64 version = 10; } message TsKvProto { int64 ts = 1; KeyValueProto kv = 2; - int64 version = 3; + optional int64 version = 3; } message TsKvListProto { From 164185c0f34c095c8e2c24b598dd4b8b6331faec Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 9 Jul 2024 17:17:23 +0200 Subject: [PATCH 3/6] added upgrade script for attributes and latest ts --- .../main/data/upgrade/3.7.0/schema_update.sql | 23 +++++++++++++++++++ .../install/ThingsboardInstallService.java | 4 ++++ .../install/SqlDatabaseUpgradeService.java | 3 +++ .../update/DefaultCacheCleanupService.java | 6 ++++- 4 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 application/src/main/data/upgrade/3.7.0/schema_update.sql diff --git a/application/src/main/data/upgrade/3.7.0/schema_update.sql b/application/src/main/data/upgrade/3.7.0/schema_update.sql new file mode 100644 index 0000000000..0e10667545 --- /dev/null +++ b/application/src/main/data/upgrade/3.7.0/schema_update.sql @@ -0,0 +1,23 @@ +-- +-- Copyright © 2016-2024 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +-- UPDATE PUBLIC CUSTOMERS START + +CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1000; +CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000; + +ALTER TABLE attribute_kv ADD COLUMN version bigint default 0; +ALTER TABLE ts_kv_latest ADD COLUMN version bigint default 0; diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 4bef0d420a..85eb64ffba 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -137,6 +137,10 @@ public class ThingsboardInstallService { entityDatabaseSchemaService.createCustomerTitleUniqueConstraintIfNotExists(); systemDataLoaderService.updateDefaultNotificationConfigs(false); systemDataLoaderService.updateSecuritySettings(); + break; + case "3.7.0": + log.info("Upgrading ThingsBoard from version 3.7.0 to 3.7.1 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("3.7.0"); //TODO DON'T FORGET to update switch statement in the CacheCleanupService if you need to clear the cache break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index 86d922c305..000c0536cc 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -121,6 +121,9 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService case "3.6.4": updateSchema("3.6.4", 3006004, "3.7.0", 3007000, null); break; + case "3.7.0": + updateSchema("3.7.0", 3007000, "3.7.1", 3007001, null); + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java index 7fd92bc5c6..0085df35d7 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java @@ -61,6 +61,10 @@ public class DefaultCacheCleanupService implements CacheCleanupService { log.info("Clearing cache to upgrade from version 3.6.4 to 3.7.0"); clearAll(); break; + case "3.7.0": + log.info("Clearing cache to upgrade from version 3.7.0 to 3.7.1"); + clearAll(); + break; default: //Do nothing, since cache cleanup is optional. } @@ -81,7 +85,7 @@ public class DefaultCacheCleanupService implements CacheCleanupService { if (redisTemplate.isPresent()) { log.info("Flushing all caches"); redisTemplate.get().execute((RedisCallback) connection -> { - connection.flushAll(); + connection.serverCommands().flushAll(); return null; }); return; From 9515bdea1d8c519cdda71ffe741bd2070ad00c23 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 9 Jul 2024 17:33:00 +0200 Subject: [PATCH 4/6] updated LUA SHA --- .../org/thingsboard/server/cache/VersionedRedisTbCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java index d0b3afec9c..3ddcc53f38 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java @@ -68,7 +68,7 @@ public abstract class VersionedRedisTbCache valueSerializer) { super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); From 7945669303cf45b9a336a5aa44e0edf9f862239d Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 10 Jul 2024 12:22:08 +0200 Subject: [PATCH 5/6] fixed sparkplug tests --- ...ctMqttV5ClientSparkplugConnectionTest.java | 21 +++++++++++++------ ...actMqttV5ClientSparkplugTelemetryTest.java | 17 +++++++++++++-- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index 603b42f6b6..561594f0bf 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -63,7 +63,9 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra return finalFuture.get().get().isPresent(); }); TsKvEntry actualTsKvEntry = finalFuture.get().get().get(); - Assert.assertEquals(expectedTsKvEntry, actualTsKvEntry); + Assert.assertEquals(expectedTsKvEntry.getKey(), actualTsKvEntry.getKey()); + Assert.assertEquals(expectedTsKvEntry.getValue(), actualTsKvEntry.getValue()); + Assert.assertEquals(expectedTsKvEntry.getTs(), actualTsKvEntry.getTs()); } protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception { @@ -95,20 +97,27 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name())); - AtomicReference>> finalFuture = new AtomicReference<>(); await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) .atMost(40, TimeUnit.SECONDS) .until(() -> { - finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); - return finalFuture.get().get().contains(tsKvEntry); + var foundEntry = tsService.findAllLatest(tenantId, savedGateway.getId()).get().stream() + .filter(tsKv -> tsKv.getKey().equals(tsKvEntry.getKey())) + .filter(tsKv -> tsKv.getValue().equals(tsKvEntry.getValue())) + .filter(tsKv -> tsKv.getTs() == tsKvEntry.getTs()) + .findFirst(); + return foundEntry.isPresent(); }); for (Device device : devices) { await(alias + messageName(STATE) + ", device: " + device.getName()) .atMost(40, TimeUnit.SECONDS) .until(() -> { - finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); - return finalFuture.get().get().contains(tsKvEntry); + var foundEntry = tsService.findAllLatest(tenantId, device.getId()).get().stream() + .filter(tsKv -> tsKv.getKey().equals(tsKvEntry.getKey())) + .filter(tsKv -> tsKv.getValue().equals(tsKvEntry.getValue())) + .filter(tsKv -> tsKv.getTs() == tsKvEntry.getTs()) + .findFirst(); + return foundEntry.isPresent(); }); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index af34e8bf0e..c637ce5bb1 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -78,7 +78,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); return finalFuture.get().get().size() == (listTsKvEntry.size() + 1); }); - Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); + Assert.assertTrue("Actual tsKvEntries is not containsAll Expected tsKvEntries", containsIgnoreVersion(finalFuture.get().get(), listTsKvEntry)); } protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysPrimitiveSimple() throws Exception { @@ -107,7 +107,20 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); return finalFuture.get().get().size() == (listTsKvEntry.size() + 1); }); - Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); + Assert.assertTrue("Actual tsKvEntries is not containsAll Expected tsKvEntries", containsIgnoreVersion(finalFuture.get().get(), listTsKvEntry)); } + private static boolean containsIgnoreVersion(List expected, List actual) { + for (TsKvEntry actualEntry : actual) { + var found = expected.stream() + .filter(tsKv -> tsKv.getKey().equals(actualEntry.getKey())) + .filter(tsKv -> tsKv.getValue().equals(actualEntry.getValue())) + .filter(tsKv -> tsKv.getTs() == actualEntry.getTs()) + .findFirst(); + if (found.isEmpty()) { + return false; + } + } + return true; + } } From 31d2d14f6069735efb3455608b1822216f162c58 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 16 Jul 2024 10:29:58 +0300 Subject: [PATCH 6/6] Cache transaction support for versioned entities --- .../main/data/upgrade/3.7.0/schema_update.sql | 4 +- .../cache/CaffeineTbTransactionalCache.java | 5 +++ .../server/cache/RedisTbCacheTransaction.java | 2 +- .../cache/RedisTbTransactionalCache.java | 13 ++++-- .../server/cache/TbTransactionalCache.java | 6 ++- .../server/cache/VersionedRedisTbCache.java | 40 +++++++++++-------- .../server/cache/VersionedTbCache.java | 8 +++- 7 files changed, 52 insertions(+), 26 deletions(-) diff --git a/application/src/main/data/upgrade/3.7.0/schema_update.sql b/application/src/main/data/upgrade/3.7.0/schema_update.sql index 0e10667545..f964387359 100644 --- a/application/src/main/data/upgrade/3.7.0/schema_update.sql +++ b/application/src/main/data/upgrade/3.7.0/schema_update.sql @@ -14,10 +14,12 @@ -- limitations under the License. -- --- UPDATE PUBLIC CUSTOMERS START +-- KV VERSIONING UPDATE START CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1000; CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000; ALTER TABLE attribute_kv ADD COLUMN version bigint default 0; ALTER TABLE ts_kv_latest ADD COLUMN version bigint default 0; + +-- KV VERSIONING UPDATE END diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java index d2ea960e68..4ce6571f1c 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/CaffeineTbTransactionalCache.java @@ -54,6 +54,11 @@ public abstract class CaffeineTbTransactionalCache get(K key, boolean transactionMode) { + return get(key); + } + @Override public void put(K key, V value) { lock.lock(); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java index fb852493ce..3dcb6e878f 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/RedisTbCacheTransaction.java @@ -31,7 +31,7 @@ public class RedisTbCacheTransaction get(K key) { + return get(key, false); + } + + @Override + public TbCacheValueWrapper get(K key, boolean transactionMode) { try (var connection = connectionFactory.getConnection()) { byte[] rawKey = getRawKey(key); - byte[] rawValue = doGet(connection, rawKey); + byte[] rawValue = doGet(connection, rawKey, transactionMode); if (rawValue == null || rawValue.length == 0) { return null; } else if (Arrays.equals(rawValue, BINARY_NULL_VALUE)) { @@ -96,18 +101,18 @@ public abstract class RedisTbTransactionalCache get(K key); + TbCacheValueWrapper get(K key, boolean transactionMode); + void put(K key, V value); void putIfAbsent(K key, V value); @@ -60,7 +62,7 @@ public interface TbTransactionalCache dbCall, boolean cacheNullValue) { - TbCacheValueWrapper cacheValueWrapper = get(key); + TbCacheValueWrapper cacheValueWrapper = get(key, true); if (cacheValueWrapper != null) { return cacheValueWrapper.get(); } @@ -95,7 +97,7 @@ public interface TbTransactionalCache R getAndPutInTransaction(K key, Supplier dbCall, Function cacheValueToResult, Function dbValueToCacheValue, boolean cacheNullValue) { - TbCacheValueWrapper cacheValueWrapper = get(key); + TbCacheValueWrapper cacheValueWrapper = get(key, true); if (cacheValueWrapper != null) { var cacheValue = cacheValueWrapper.get(); return cacheValue == null ? null : cacheValueToResult.apply(cacheValue); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java index 3ddcc53f38..67b6c86f96 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/VersionedRedisTbCache.java @@ -88,31 +88,30 @@ public abstract class VersionedRedisTbCache get(K key); default V get(K key, Supplier supplier) { + return get(key, supplier, true); + } + + default V get(K key, Supplier supplier, boolean putToCache) { return Optional.ofNullable(get(key)) .map(TbCacheValueWrapper::get) .orElseGet(() -> { V value = supplier.get(); - put(key, value); + if (putToCache) { + put(key, value); + } return value; }); }