added version to timeseries and attribute protos

This commit is contained in:
YevhenBondarenko 2024-07-09 15:19:01 +02:00
parent 77a420b6d6
commit d6367c9680
15 changed files with 253 additions and 201 deletions

View File

@ -45,11 +45,10 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
@Override @Override
public void put(K key, V value) { public void put(K key, V value) {
Long version = value != null ? value.getVersion() : 0; Long version = value != null ? value.getVersion() : 0;
put(key, value, version); doPut(key, value, version);
} }
@Override private void doPut(K key, V value, Long version) {
public void put(K key, V value, Long version) {
if (version == null) { if (version == null) {
return; return;
} }
@ -86,7 +85,7 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
} }
lock.lock(); lock.lock();
try { try {
put(key, null, version); doPut(key, null, version);
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -47,7 +47,7 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue local newValueWithVersion = struct.pack(">I8", newVersion) .. newValue
redis.call('SET', key, newValueWithVersion, 'EX', expiration) redis.call('SET', key, newValueWithVersion, 'EX', expiration)
end end
local function bytes_to_number(bytes) local function bytes_to_number(bytes)
local n = 0 local n = 0
for i = 1, 8 do for i = 1, 8 do
@ -96,13 +96,15 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
@Override @Override
public void put(K key, V value) { public void put(K key, V value) {
Long version = value != null ? value.getVersion() : 0; log.trace("put [{}][{}]", key, value);
put(key, value, version); Long version;
} if (value == null) {
version = 0L;
@Override } else if (value.getVersion() != null) {
public void put(K key, V value, Long version) { version = value.getVersion();
log.trace("put [{}][{}][{}]", key, value, version); } else {
return;
}
doPut(key, value, version, cacheTtl); doPut(key, value, version, cacheTtl);
} }

View File

@ -25,8 +25,6 @@ public interface VersionedTbCache<K extends Serializable, V extends Serializable
void put(K key, V value); void put(K key, V value);
void put(K key, V value, Long version);
void evict(K key); void evict(K key);
void evict(K key, Long version); void evict(K key, Long version);

View File

@ -16,11 +16,14 @@
package org.thingsboard.server.common.data.kv; package org.thingsboard.server.common.data.kv;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import lombok.Data;
import java.util.Optional; import java.util.Optional;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Data
public class BaseAttributeKvEntry implements AttributeKvEntry { public class BaseAttributeKvEntry implements AttributeKvEntry {
private static final long serialVersionUID = -6460767583563159407L; private static final long serialVersionUID = -6460767583563159407L;
@ -47,11 +50,6 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
this(kv, lastUpdateTs); this(kv, lastUpdateTs);
} }
@Override
public long getLastUpdateTs() {
return lastUpdateTs;
}
@Override @Override
public String getKey() { public String getKey() {
return kv.getKey(); return kv.getKey();
@ -97,35 +95,4 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
return kv.getValue(); return kv.getValue();
} }
@Override
public Long getVersion() {
return version;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BaseAttributeKvEntry that = (BaseAttributeKvEntry) o;
if (lastUpdateTs != that.lastUpdateTs) return false;
return kv.equals(that.kv);
}
@Override
public int hashCode() {
int result = (int) (lastUpdateTs ^ (lastUpdateTs >>> 32));
result = 31 * result + kv.hashCode();
return result;
}
@Override
public String toString() {
return "BaseAttributeKvEntry{" +
"lastUpdateTs=" + lastUpdateTs +
", kv=" + kv +
'}';
}
} }

View File

@ -16,9 +16,12 @@
package org.thingsboard.server.common.data.kv; package org.thingsboard.server.common.data.kv;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import lombok.Data;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@Data
public class BasicTsKvEntry implements TsKvEntry { public class BasicTsKvEntry implements TsKvEntry {
private static final int MAX_CHARS_PER_DATA_POINT = 512; private static final int MAX_CHARS_PER_DATA_POINT = 512;
protected final long ts; protected final long ts;
@ -79,33 +82,6 @@ public class BasicTsKvEntry implements TsKvEntry {
return kv.getValue(); return kv.getValue();
} }
@Override
public long getTs() {
return ts;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof BasicTsKvEntry)) return false;
BasicTsKvEntry that = (BasicTsKvEntry) o;
return getTs() == that.getTs() &&
Objects.equals(kv, that.kv);
}
@Override
public int hashCode() {
return Objects.hash(getTs(), kv);
}
@Override
public String toString() {
return "BasicTsKvEntry{" +
"ts=" + ts +
", kv=" + kv +
'}';
}
@Override @Override
public String getValueAsString() { public String getValueAsString() {
return kv.getValueAsString(); return kv.getValueAsString();
@ -127,8 +103,4 @@ public class BasicTsKvEntry implements TsKvEntry {
return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT); return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT);
} }
@Override
public Long getVersion() {
return version;
}
} }

View File

@ -86,8 +86,15 @@ public class KvProtoUtil {
.setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build(); .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build();
} }
public static TransportProtos.TsKvProto toTsKvProto(long ts, KvEntry kvEntry, long version) {
return TransportProtos.TsKvProto.newBuilder()
.setTs(ts)
.setVersion(version)
.setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build();
}
public static TsKvEntry fromTsKvProto(TransportProtos.TsKvProto proto) { public static TsKvEntry fromTsKvProto(TransportProtos.TsKvProto proto) {
return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv())); return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv()), proto.getVersion());
} }
public static TransportProtos.KeyValueProto toKeyValueTypeProto(KvEntry kvEntry) { public static TransportProtos.KeyValueProto toKeyValueTypeProto(KvEntry kvEntry) {

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.common.util;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.ApiUsageStateValue;
@ -256,42 +258,47 @@ public class ProtoUtils {
if (msg.getValues() != null) { if (msg.getValues() != null) {
for (AttributeKvEntry attributeKvEntry : msg.getValues()) { for (AttributeKvEntry attributeKvEntry : msg.getValues()) {
TransportProtos.AttributeValueProto.Builder attributeValueBuilder = TransportProtos.AttributeValueProto.newBuilder() builder.addValues(toProto(attributeKvEntry));
.setLastUpdateTs(attributeKvEntry.getLastUpdateTs())
.setKey(attributeKvEntry.getKey());
switch (attributeKvEntry.getDataType()) {
case BOOLEAN -> {
attributeKvEntry.getBooleanValue().ifPresent(attributeValueBuilder::setBoolV);
attributeValueBuilder.setHasV(attributeKvEntry.getBooleanValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.BOOLEAN_V);
}
case STRING -> {
attributeKvEntry.getStrValue().ifPresent(attributeValueBuilder::setStringV);
attributeValueBuilder.setHasV(attributeKvEntry.getStrValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.STRING_V);
}
case DOUBLE -> {
attributeKvEntry.getDoubleValue().ifPresent(attributeValueBuilder::setDoubleV);
attributeValueBuilder.setHasV(attributeKvEntry.getDoubleValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.DOUBLE_V);
}
case LONG -> {
attributeKvEntry.getLongValue().ifPresent(attributeValueBuilder::setLongV);
attributeValueBuilder.setHasV(attributeKvEntry.getLongValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.LONG_V);
}
case JSON -> {
attributeKvEntry.getJsonValue().ifPresent(attributeValueBuilder::setJsonV);
attributeValueBuilder.setHasV(attributeKvEntry.getJsonValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.JSON_V);
}
}
builder.addValues(attributeValueBuilder.build());
} }
} }
return builder.build(); return builder.build();
} }
public static TransportProtos.AttributeValueProto toProto(AttributeKvEntry attributeKvEntry) {
TransportProtos.AttributeValueProto.Builder builder = TransportProtos.AttributeValueProto.newBuilder()
.setLastUpdateTs(attributeKvEntry.getLastUpdateTs())
.setKey(attributeKvEntry.getKey());
switch (attributeKvEntry.getDataType()) {
case BOOLEAN:
attributeKvEntry.getBooleanValue().ifPresent(builder::setBoolV);
builder.setHasV(attributeKvEntry.getBooleanValue().isPresent());
builder.setType(TransportProtos.KeyValueType.BOOLEAN_V);
break;
case STRING:
attributeKvEntry.getStrValue().ifPresent(builder::setStringV);
builder.setHasV(attributeKvEntry.getStrValue().isPresent());
builder.setType(TransportProtos.KeyValueType.STRING_V);
break;
case DOUBLE:
attributeKvEntry.getDoubleValue().ifPresent(builder::setDoubleV);
builder.setHasV(attributeKvEntry.getDoubleValue().isPresent());
builder.setType(TransportProtos.KeyValueType.DOUBLE_V);
break;
case LONG:
attributeKvEntry.getLongValue().ifPresent(builder::setLongV);
builder.setHasV(attributeKvEntry.getLongValue().isPresent());
builder.setType(TransportProtos.KeyValueType.LONG_V);
break;
case JSON:
attributeKvEntry.getJsonValue().ifPresent(builder::setJsonV);
builder.setHasV(attributeKvEntry.getJsonValue().isPresent());
builder.setType(TransportProtos.KeyValueType.JSON_V);
break;
}
builder.setVersion(attributeKvEntry.getVersion());
return builder.build();
}
private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) { private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) {
return new DeviceAttributesEventNotificationMsg( return new DeviceAttributesEventNotificationMsg(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
@ -500,20 +507,25 @@ public class ProtoUtils {
} }
List<AttributeKvEntry> result = new ArrayList<>(); List<AttributeKvEntry> result = new ArrayList<>();
for (TransportProtos.AttributeValueProto kvEntry : valuesList) { for (TransportProtos.AttributeValueProto kvEntry : valuesList) {
boolean hasValue = kvEntry.getHasV(); result.add(fromProto(kvEntry));
KvEntry entry = switch (kvEntry.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null);
case LONG_V -> new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null);
case STRING_V -> new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null);
case JSON_V -> new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null);
default -> null;
};
result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry));
} }
return result; return result;
} }
public static AttributeKvEntry fromProto(TransportProtos.AttributeValueProto proto) {
boolean hasValue = proto.getHasV();
String key = proto.getKey();
KvEntry entry = switch (proto.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(key, hasValue ? proto.getBoolV() : null);
case LONG_V -> new LongDataEntry(key, hasValue ? proto.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(key, hasValue ? proto.getDoubleV() : null);
case STRING_V -> new StringDataEntry(key, hasValue ? proto.getStringV() : null);
case JSON_V -> new JsonDataEntry(key, hasValue ? proto.getJsonV() : null);
default -> null;
};
return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.getVersion());
}
public static TransportProtos.DeviceProto toProto(Device device) { public static TransportProtos.DeviceProto toProto(Device device) {
var builder = TransportProtos.DeviceProto.newBuilder() var builder = TransportProtos.DeviceProto.newBuilder()
.setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())

View File

@ -157,11 +157,13 @@ message AttributeValueProto {
string string_v = 7; string string_v = 7;
string json_v = 8; string json_v = 8;
optional string key = 9; optional string key = 9;
int64 version = 10;
} }
message TsKvProto { message TsKvProto {
int64 ts = 1; int64 ts = 1;
KeyValueProto kv = 2; KeyValueProto kv = 2;
int64 version = 3;
} }
message TsKvListProto { message TsKvListProto {

View File

@ -26,15 +26,8 @@ import org.thingsboard.server.cache.TbRedisSerializer;
import org.thingsboard.server.cache.VersionedRedisTbCache; import org.thingsboard.server.cache.VersionedRedisTbCache;
import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("AttributeCache") @Service("AttributeCache")
@ -44,54 +37,13 @@ public class AttributeRedisCache extends VersionedRedisTbCache<AttributeCacheKey
super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {
@Override @Override
public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException { public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException {
AttributeValueProto.Builder builder = AttributeValueProto.newBuilder() return ProtoUtils.toProto(attributeKvEntry).toByteArray();
.setLastUpdateTs(attributeKvEntry.getLastUpdateTs());
switch (attributeKvEntry.getDataType()) {
case BOOLEAN:
attributeKvEntry.getBooleanValue().ifPresent(builder::setBoolV);
builder.setHasV(attributeKvEntry.getBooleanValue().isPresent());
builder.setType(KeyValueType.BOOLEAN_V);
break;
case STRING:
attributeKvEntry.getStrValue().ifPresent(builder::setStringV);
builder.setHasV(attributeKvEntry.getStrValue().isPresent());
builder.setType(KeyValueType.STRING_V);
break;
case DOUBLE:
attributeKvEntry.getDoubleValue().ifPresent(builder::setDoubleV);
builder.setHasV(attributeKvEntry.getDoubleValue().isPresent());
builder.setType(KeyValueType.DOUBLE_V);
break;
case LONG:
attributeKvEntry.getLongValue().ifPresent(builder::setLongV);
builder.setHasV(attributeKvEntry.getLongValue().isPresent());
builder.setType(KeyValueType.LONG_V);
break;
case JSON:
attributeKvEntry.getJsonValue().ifPresent(builder::setJsonV);
builder.setHasV(attributeKvEntry.getJsonValue().isPresent());
builder.setType(KeyValueType.JSON_V);
break;
}
return builder.build().toByteArray();
} }
@Override @Override
public AttributeKvEntry deserialize(AttributeCacheKey key, byte[] bytes) throws SerializationException { public AttributeKvEntry deserialize(AttributeCacheKey key, byte[] bytes) throws SerializationException {
try { try {
AttributeValueProto proto = AttributeValueProto.parseFrom(bytes); return ProtoUtils.fromProto(AttributeValueProto.parseFrom(bytes));
boolean hasValue = proto.getHasV();
KvEntry entry = switch (proto.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(key.getKey(), hasValue ? proto.getBoolV() : null);
case LONG_V -> new LongDataEntry(key.getKey(), hasValue ? proto.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(key.getKey(), hasValue ? proto.getDoubleV() : null);
case STRING_V -> new StringDataEntry(key.getKey(), hasValue ? proto.getStringV() : null);
case JSON_V -> new JsonDataEntry(key.getKey(), hasValue ? proto.getJsonV() : null);
default ->
throw new InvalidProtocolBufferException("Unrecognized type: " + proto.getType() + " !");
};
return new BaseAttributeKvEntry(proto.getLastUpdateTs(), entry);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new SerializationException(e.getMessage()); throw new SerializationException(e.getMessage());
} }

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsFactory;
@ -159,7 +160,7 @@ public class CachedAttributesService implements AttributesService {
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
for (AttributeKvEntry foundInDbAttribute : result) { for (AttributeKvEntry foundInDbAttribute : result) {
put(entityId, scope, foundInDbAttribute, foundInDbAttribute.getVersion()); put(entityId, scope, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
} }
for (String key : notFoundAttributeKeys) { for (String key : notFoundAttributeKeys) {
@ -218,8 +219,7 @@ public class CachedAttributesService implements AttributesService {
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope); validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation); AttributeUtils.validate(attribute, valueNoXssValidation);
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute); return doSave(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor);
} }
@Override @Override
@ -234,19 +234,25 @@ public class CachedAttributesService implements AttributesService {
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size()); List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) { for (var attribute : attributes) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute); futures.add(doSave(tenantId, entityId, scope, attribute));
futures.add(Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor));
} }
return Futures.allAsList(futures); return Futures.allAsList(futures);
} }
private Long put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) { private ListenableFuture<Long> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> {
put(entityId, scope, new BaseAttributeKvEntry(((BaseAttributeKvEntry)attribute).getKv(), attribute.getLastUpdateTs(), version));
return version;
}, cacheExecutor);
}
private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
String key = attribute.getKey(); String key = attribute.getKey();
log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute); log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute);
cache.put(new AttributeCacheKey(scope, entityId, key), attribute, version); cache.put(new AttributeCacheKey(scope, entityId, key), attribute);
log.trace("[{}][{}][{}] after cache put.", entityId, scope, key); log.trace("[{}][{}][{}] after cache put.", entityId, scope, key);
return version;
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
@ -66,9 +67,9 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
@Override @Override
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
ListenableFuture<Long> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); ListenableFuture<Long> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
future = Futures.transform(future, x -> { future = Futures.transform(future, version -> {
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry, x); cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), new BasicTsKvEntry(tsKvEntry.getTs(), ((BasicTsKvEntry) tsKvEntry).getKv(), version));
return x; return version;
}, },
cacheExecutorService); cacheExecutorService);
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
@ -94,8 +95,9 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
if (x.isRemoved()) { if (x.isRemoved()) {
TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey()); TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey());
Long version = x.getVersion(); Long version = x.getVersion();
if (x.getData() != null) { TsKvEntry newTsKvEntry = x.getData();
cache.put(key, x.getData(), version); if (newTsKvEntry != null) {
cache.put(key, new BasicTsKvEntry(newTsKvEntry.getTs(), ((BasicTsKvEntry) newTsKvEntry).getKv(), version));
} else { } else {
cache.evict(key, version); cache.evict(key, version);
} }

View File

@ -39,7 +39,7 @@ public class TsLatestRedisCache extends VersionedRedisTbCache<TsLatestCacheKey,
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {
@Override @Override
public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException { public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException {
return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry).toByteArray(); return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry, tsKvEntry.getVersion()).toByteArray();
} }
@Override @Override

View File

@ -26,7 +26,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -34,7 +33,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.service.AbstractServiceTest; import org.thingsboard.server.dao.service.AbstractServiceTest;
@ -74,7 +72,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attr)).get(); attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attr)).get();
Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attr.getKey()).get(); Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attr.getKey()).get();
Assert.assertTrue(saved.isPresent()); Assert.assertTrue(saved.isPresent());
Assert.assertEquals(attr, saved.get()); equalsIgnoreVersion(attr, saved.get());
} }
@Test @Test
@ -87,14 +85,15 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get(); Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get();
Assert.assertTrue(saved.isPresent()); Assert.assertTrue(saved.isPresent());
Assert.assertEquals(attrOld, saved.get()); equalsIgnoreVersion(attrOld, saved.get());
KvEntry attrNewValue = new StringDataEntry("attribute1", "value2"); KvEntry attrNewValue = new StringDataEntry("attribute1", "value2");
AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L); AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L);
attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attrNew)).get(); attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attrNew)).get();
saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get(); saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get();
Assert.assertEquals(attrNew, saved.get()); Assert.assertTrue(saved.isPresent());
equalsIgnoreVersion(attrNew, saved.get());
} }
@Test @Test
@ -117,8 +116,8 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
Assert.assertNotNull(saved); Assert.assertNotNull(saved);
Assert.assertEquals(2, saved.size()); Assert.assertEquals(2, saved.size());
Assert.assertEquals(attrANew, saved.get(0)); equalsIgnoreVersion(attrANew, saved.get(0));
Assert.assertEquals(attrBNew, saved.get(1)); equalsIgnoreVersion(attrBNew, saved.get(1));
} }
@Test @Test
@ -253,6 +252,11 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
})); }));
futures.add(pool.submit(() -> saveAttribute(tenantId, deviceId, scope, key, NEW_VALUE))); futures.add(pool.submit(() -> saveAttribute(tenantId, deviceId, scope, key, NEW_VALUE)));
Futures.allAsList(futures).get(10, TimeUnit.SECONDS); Futures.allAsList(futures).get(10, TimeUnit.SECONDS);
String attributeValue = getAttributeValue(tenantId, deviceId, scope, key);
if (!NEW_VALUE.equals(attributeValue)) {
System.out.println();
}
Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key)); Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key));
} }
@ -309,5 +313,10 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
} }
} }
private void equalsIgnoreVersion(AttributeKvEntry expected, AttributeKvEntry actual) {
Assert.assertEquals(expected.getKey(), actual.getKey());
Assert.assertEquals(expected.getValue(), actual.getValue());
Assert.assertEquals(expected.getLastUpdateTs(), actual.getLastUpdateTs());
}
} }

View File

@ -0,0 +1,114 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.service.attributes.sql;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@DaoSqlTest
public class AttributeCacheServiceSqlTest extends AbstractServiceTest {
private static final String TEST_KEY = "key";
private static final String TEST_VALUE = "value";
private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID());
@Autowired
VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache;
@Test
public void testPutAndGet() {
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY);
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L);
cache.put(testKey, testValue);
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue, wrapper.get());
AttributeKvEntry testValue2 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 2L);
cache.put(testKey, testValue2);
wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue2, wrapper.get());
AttributeKvEntry testValue3 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 0L);
cache.put(testKey, testValue3);
wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue2, wrapper.get());
cache.evict(testKey);
}
@Test
public void testEvictWithVersion() {
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY);
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L);
cache.put(testKey, testValue);
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue, wrapper.get());
cache.evict(testKey, 2L);
wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertNull(wrapper.get());
cache.evict(testKey);
}
@Test
public void testEvict() {
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY);
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L);
cache.put(testKey, testValue);
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue, wrapper.get());
cache.evict(testKey);
wrapper = cache.get(testKey);
assertNull(wrapper);
}
}

View File

@ -137,7 +137,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
toTsEntry(TS, booleanKvEntry)); toTsEntry(TS, booleanKvEntry));
Collections.sort(expected, Comparator.comparing(KvEntry::getKey)); Collections.sort(expected, Comparator.comparing(KvEntry::getKey));
assertEquals(expected, tsList); for (int i = 0; i < expected.size(); i++) {
var expectedEntry = expected.get(i);
var actualEntry = tsList.get(i);
equalsIgnoreVersion(expectedEntry, actualEntry);
}
} }
private EntityView saveAndCreateEntityView(DeviceId deviceId, List<String> timeseries) { private EntityView saveAndCreateEntityView(DeviceId deviceId, List<String> timeseries) {
@ -160,7 +165,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(1, entries.size()); Assert.assertEquals(1, entries.size());
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entries.get(0));
} }
@Test @Test
@ -176,7 +181,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent(); assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.orElse(null)); equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entryOpt.get());
} }
@Test @Test
@ -186,7 +191,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent(); assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.orElse(null)); equalsIgnoreVersion(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.get());
} }
public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception { public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception {
@ -209,7 +214,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent(); assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.get()); equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entryOpt.get());
} }
@Test @Test
@ -239,7 +244,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(1, entries.size()); Assert.assertEquals(1, entries.size());
Assert.assertEquals(toTsEntry(TS - 1, stringKvEntry), entries.get(0)); equalsIgnoreVersion(toTsEntry(TS - 1, stringKvEntry), entries.get(0));
} }
@Test @Test
@ -794,5 +799,10 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
return new BasicTsKvEntry(ts, entry); return new BasicTsKvEntry(ts, entry);
} }
private static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) {
assertEquals(expected.getKey(), actual.getKey());
assertEquals(expected.getValue(), actual.getValue());
assertEquals(expected.getTs(), actual.getTs());
}
} }