Merge remote-tracking branch 'origin/feature/attr_tskv_version' into feature/entities-version

This commit is contained in:
ViacheslavKlimov 2024-07-16 10:37:21 +03:00
commit b46e9fb6ec
19 changed files with 321 additions and 217 deletions

View File

@ -14,7 +14,18 @@
-- limitations under the License. -- limitations under the License.
-- --
-- Optimistic locking update START -- KV VERSIONING UPDATE START
CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1000;
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
ALTER TABLE attribute_kv ADD COLUMN version bigint default 0;
ALTER TABLE ts_kv_latest ADD COLUMN version bigint default 0;
-- KV VERSIONING UPDATE END
-- ENTITIES VERSIONING UPDATE START
ALTER TABLE device ADD COLUMN IF NOT EXISTS version INT DEFAULT 1; ALTER TABLE device ADD COLUMN IF NOT EXISTS version INT DEFAULT 1;
ALTER TABLE device_profile ADD COLUMN IF NOT EXISTS version INT DEFAULT 1; ALTER TABLE device_profile ADD COLUMN IF NOT EXISTS version INT DEFAULT 1;
@ -30,4 +41,4 @@ ALTER TABLE dashboard ADD COLUMN IF NOT EXISTS version INT DEFAULT 1;
ALTER TABLE widget_type ADD COLUMN IF NOT EXISTS version INT DEFAULT 1; ALTER TABLE widget_type ADD COLUMN IF NOT EXISTS version INT DEFAULT 1;
ALTER TABLE widgets_bundle ADD COLUMN IF NOT EXISTS version INT DEFAULT 1; ALTER TABLE widgets_bundle ADD COLUMN IF NOT EXISTS version INT DEFAULT 1;
-- Optimistic locking update END -- ENTITIES VERSIONING UPDATE END

View File

@ -61,6 +61,10 @@ public class DefaultCacheCleanupService implements CacheCleanupService {
log.info("Clearing cache to upgrade from version 3.6.4 to 3.7.0"); log.info("Clearing cache to upgrade from version 3.6.4 to 3.7.0");
clearAll(); clearAll();
break; break;
case "3.7.0":
log.info("Clearing cache to upgrade from version 3.7.0 to 3.7.1");
clearAll();
break;
default: default:
//Do nothing, since cache cleanup is optional. //Do nothing, since cache cleanup is optional.
} }
@ -81,7 +85,7 @@ public class DefaultCacheCleanupService implements CacheCleanupService {
if (redisTemplate.isPresent()) { if (redisTemplate.isPresent()) {
log.info("Flushing all caches"); log.info("Flushing all caches");
redisTemplate.get().execute((RedisCallback<Object>) connection -> { redisTemplate.get().execute((RedisCallback<Object>) connection -> {
connection.flushAll(); connection.serverCommands().flushAll();
return null; return null;
}); });
return; return;

View File

@ -63,7 +63,9 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
return finalFuture.get().get().isPresent(); return finalFuture.get().get().isPresent();
}); });
TsKvEntry actualTsKvEntry = finalFuture.get().get().get(); TsKvEntry actualTsKvEntry = finalFuture.get().get().get();
Assert.assertEquals(expectedTsKvEntry, actualTsKvEntry); Assert.assertEquals(expectedTsKvEntry.getKey(), actualTsKvEntry.getKey());
Assert.assertEquals(expectedTsKvEntry.getValue(), actualTsKvEntry.getValue());
Assert.assertEquals(expectedTsKvEntry.getTs(), actualTsKvEntry.getTs());
} }
protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception { protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception {
@ -95,20 +97,27 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name())); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
.atMost(40, TimeUnit.SECONDS) .atMost(40, TimeUnit.SECONDS)
.until(() -> { .until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); var foundEntry = tsService.findAllLatest(tenantId, savedGateway.getId()).get().stream()
return finalFuture.get().get().contains(tsKvEntry); .filter(tsKv -> tsKv.getKey().equals(tsKvEntry.getKey()))
.filter(tsKv -> tsKv.getValue().equals(tsKvEntry.getValue()))
.filter(tsKv -> tsKv.getTs() == tsKvEntry.getTs())
.findFirst();
return foundEntry.isPresent();
}); });
for (Device device : devices) { for (Device device : devices) {
await(alias + messageName(STATE) + ", device: " + device.getName()) await(alias + messageName(STATE) + ", device: " + device.getName())
.atMost(40, TimeUnit.SECONDS) .atMost(40, TimeUnit.SECONDS)
.until(() -> { .until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); var foundEntry = tsService.findAllLatest(tenantId, device.getId()).get().stream()
return finalFuture.get().get().contains(tsKvEntry); .filter(tsKv -> tsKv.getKey().equals(tsKvEntry.getKey()))
.filter(tsKv -> tsKv.getValue().equals(tsKvEntry.getValue()))
.filter(tsKv -> tsKv.getTs() == tsKvEntry.getTs())
.findFirst();
return foundEntry.isPresent();
}); });
} }
} }

View File

@ -78,7 +78,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
return finalFuture.get().get().size() == (listTsKvEntry.size() + 1); return finalFuture.get().get().size() == (listTsKvEntry.size() + 1);
}); });
Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); Assert.assertTrue("Actual tsKvEntries is not containsAll Expected tsKvEntries", containsIgnoreVersion(finalFuture.get().get(), listTsKvEntry));
} }
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysPrimitiveSimple() throws Exception { protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysPrimitiveSimple() throws Exception {
@ -107,7 +107,20 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
return finalFuture.get().get().size() == (listTsKvEntry.size() + 1); return finalFuture.get().get().size() == (listTsKvEntry.size() + 1);
}); });
Assert.assertTrue("Actual tsKvEntrys is not containsAll Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); Assert.assertTrue("Actual tsKvEntries is not containsAll Expected tsKvEntries", containsIgnoreVersion(finalFuture.get().get(), listTsKvEntry));
} }
private static boolean containsIgnoreVersion(List<TsKvEntry> expected, List<TsKvEntry> actual) {
for (TsKvEntry actualEntry : actual) {
var found = expected.stream()
.filter(tsKv -> tsKv.getKey().equals(actualEntry.getKey()))
.filter(tsKv -> tsKv.getValue().equals(actualEntry.getValue()))
.filter(tsKv -> tsKv.getTs() == actualEntry.getTs())
.findFirst();
if (found.isEmpty()) {
return false;
}
}
return true;
}
} }

View File

@ -40,11 +40,10 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
@Override @Override
public void put(K key, V value) { public void put(K key, V value) {
Long version = value != null ? value.getVersion() : 0; Long version = value != null ? value.getVersion() : 0;
put(key, value, version); doPut(key, value, version);
} }
@Override private void doPut(K key, V value, Long version) {
public void put(K key, V value, Long version) {
if (version == null) { if (version == null) {
return; return;
} }
@ -81,7 +80,7 @@ public abstract class VersionedCaffeineTbCache<K extends Serializable, V extends
if (version == null) { if (version == null) {
return; return;
} }
put(key, null, version); doPut(key, null, version);
} }
@Override @Override

View File

@ -60,7 +60,7 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
if currentVersionBytes and #currentVersionBytes == 8 then if currentVersionBytes and #currentVersionBytes == 8 then
local currentVersion = bytes_to_number(currentVersionBytes) local currentVersion = bytes_to_number(currentVersionBytes)
if newVersion > currentVersion or newVersion == 1 and currentVersion > 1 then if newVersion > currentVersion then
setNewValue() setNewValue()
end end
else else
@ -68,7 +68,7 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
setNewValue() setNewValue()
end end
"""); """);
static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("05a09f34f523429c96c6eaabbe6f2595f5cba2c3"); static final byte[] SET_VERSIONED_VALUE_SHA = StringRedisSerializer.UTF_8.serialize("80e56cbbbb4bd9cb150d6537f1e7d8df4fddb252");
public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) { public VersionedRedisTbCache(String cacheName, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration, TbRedisSerializer<K, V> valueSerializer) {
super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer); super(cacheName, cacheSpecsMap, connectionFactory, configuration, valueSerializer);
@ -97,12 +97,10 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
@Override @Override
public void put(K key, V value) { public void put(K key, V value) {
Long version = value != null ? value.getVersion() : 0; Long version = getVersion(value);
put(key, value, version); if (version == null) {
} return;
}
@Override
public void put(K key, V value, Long version) {
doPut(key, value, version, cacheTtl); doPut(key, value, version, cacheTtl);
} }
@ -112,16 +110,16 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
super.put(key, value, connection, true); // because scripting commands are not supported in transaction mode super.put(key, value, connection, true); // because scripting commands are not supported in transaction mode
return; return;
} }
Long version = value != null ? value.getVersion() : 0; Long version = getVersion(value);
if (version == null) {
return;
}
byte[] rawKey = getRawKey(key); byte[] rawKey = getRawKey(key);
doPut(rawKey, value, version, cacheTtl, connection); doPut(rawKey, value, version, cacheTtl, connection);
} }
private void doPut(K key, V value, Long version, Expiration expiration) { private void doPut(K key, V value, Long version, Expiration expiration) {
log.trace("put [{}][{}][{}]", key, value, version); log.trace("put [{}][{}][{}]", key, value, version);
if (version == null) {
return;
}
final byte[] rawKey = getRawKey(key); final byte[] rawKey = getRawKey(key);
try (var connection = getConnection(rawKey)) { try (var connection = getConnection(rawKey)) {
doPut(rawKey, value, version, expiration, connection); doPut(rawKey, value, version, expiration, connection);
@ -167,4 +165,14 @@ public abstract class VersionedRedisTbCache<K extends Serializable, V extends Se
throw new NotImplementedException("evictOrPut is not supported by versioned cache"); throw new NotImplementedException("evictOrPut is not supported by versioned cache");
} }
private Long getVersion(V value) {
if (value == null) {
return 0L;
} else if (value.getVersion() != null) {
return value.getVersion();
} else {
return null;
}
}
} }

View File

@ -44,8 +44,6 @@ public interface VersionedTbCache<K extends Serializable, V extends Serializable
void put(K key, V value); void put(K key, V value);
void put(K key, V value, Long version);
void evict(K key); void evict(K key);
void evict(Collection<K> keys); void evict(Collection<K> keys);

View File

@ -16,11 +16,14 @@
package org.thingsboard.server.common.data.kv; package org.thingsboard.server.common.data.kv;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import lombok.Data;
import java.util.Optional; import java.util.Optional;
/** /**
* @author Andrew Shvayka * @author Andrew Shvayka
*/ */
@Data
public class BaseAttributeKvEntry implements AttributeKvEntry { public class BaseAttributeKvEntry implements AttributeKvEntry {
private static final long serialVersionUID = -6460767583563159407L; private static final long serialVersionUID = -6460767583563159407L;
@ -47,11 +50,6 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
this(kv, lastUpdateTs); this(kv, lastUpdateTs);
} }
@Override
public long getLastUpdateTs() {
return lastUpdateTs;
}
@Override @Override
public String getKey() { public String getKey() {
return kv.getKey(); return kv.getKey();
@ -97,35 +95,4 @@ public class BaseAttributeKvEntry implements AttributeKvEntry {
return kv.getValue(); return kv.getValue();
} }
@Override
public Long getVersion() {
return version;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BaseAttributeKvEntry that = (BaseAttributeKvEntry) o;
if (lastUpdateTs != that.lastUpdateTs) return false;
return kv.equals(that.kv);
}
@Override
public int hashCode() {
int result = (int) (lastUpdateTs ^ (lastUpdateTs >>> 32));
result = 31 * result + kv.hashCode();
return result;
}
@Override
public String toString() {
return "BaseAttributeKvEntry{" +
"lastUpdateTs=" + lastUpdateTs +
", kv=" + kv +
'}';
}
} }

View File

@ -16,9 +16,12 @@
package org.thingsboard.server.common.data.kv; package org.thingsboard.server.common.data.kv;
import jakarta.validation.Valid; import jakarta.validation.Valid;
import lombok.Data;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@Data
public class BasicTsKvEntry implements TsKvEntry { public class BasicTsKvEntry implements TsKvEntry {
private static final int MAX_CHARS_PER_DATA_POINT = 512; private static final int MAX_CHARS_PER_DATA_POINT = 512;
protected final long ts; protected final long ts;
@ -79,33 +82,6 @@ public class BasicTsKvEntry implements TsKvEntry {
return kv.getValue(); return kv.getValue();
} }
@Override
public long getTs() {
return ts;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof BasicTsKvEntry)) return false;
BasicTsKvEntry that = (BasicTsKvEntry) o;
return getTs() == that.getTs() &&
Objects.equals(kv, that.kv);
}
@Override
public int hashCode() {
return Objects.hash(getTs(), kv);
}
@Override
public String toString() {
return "BasicTsKvEntry{" +
"ts=" + ts +
", kv=" + kv +
'}';
}
@Override @Override
public String getValueAsString() { public String getValueAsString() {
return kv.getValueAsString(); return kv.getValueAsString();
@ -127,8 +103,4 @@ public class BasicTsKvEntry implements TsKvEntry {
return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT); return Math.max(1, (length + MAX_CHARS_PER_DATA_POINT - 1) / MAX_CHARS_PER_DATA_POINT);
} }
@Override
public Long getVersion() {
return version;
}
} }

View File

@ -86,8 +86,20 @@ public class KvProtoUtil {
.setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build(); .setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry)).build();
} }
public static TransportProtos.TsKvProto toTsKvProto(long ts, KvEntry kvEntry, Long version) {
var builder = TransportProtos.TsKvProto.newBuilder()
.setTs(ts)
.setKv(KvProtoUtil.toKeyValueTypeProto(kvEntry));
if (version != null) {
builder.setVersion(version);
}
return builder.build();
}
public static TsKvEntry fromTsKvProto(TransportProtos.TsKvProto proto) { public static TsKvEntry fromTsKvProto(TransportProtos.TsKvProto proto) {
return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv())); return new BasicTsKvEntry(proto.getTs(), fromTsKvProto(proto.getKv()), proto.hasVersion() ? proto.getVersion() : null);
} }
public static TransportProtos.KeyValueProto toKeyValueTypeProto(KvEntry kvEntry) { public static TransportProtos.KeyValueProto toKeyValueTypeProto(KvEntry kvEntry) {

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.common.util;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.ApiUsageStateValue;
@ -256,42 +258,51 @@ public class ProtoUtils {
if (msg.getValues() != null) { if (msg.getValues() != null) {
for (AttributeKvEntry attributeKvEntry : msg.getValues()) { for (AttributeKvEntry attributeKvEntry : msg.getValues()) {
TransportProtos.AttributeValueProto.Builder attributeValueBuilder = TransportProtos.AttributeValueProto.newBuilder() builder.addValues(toProto(attributeKvEntry));
.setLastUpdateTs(attributeKvEntry.getLastUpdateTs())
.setKey(attributeKvEntry.getKey());
switch (attributeKvEntry.getDataType()) {
case BOOLEAN -> {
attributeKvEntry.getBooleanValue().ifPresent(attributeValueBuilder::setBoolV);
attributeValueBuilder.setHasV(attributeKvEntry.getBooleanValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.BOOLEAN_V);
}
case STRING -> {
attributeKvEntry.getStrValue().ifPresent(attributeValueBuilder::setStringV);
attributeValueBuilder.setHasV(attributeKvEntry.getStrValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.STRING_V);
}
case DOUBLE -> {
attributeKvEntry.getDoubleValue().ifPresent(attributeValueBuilder::setDoubleV);
attributeValueBuilder.setHasV(attributeKvEntry.getDoubleValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.DOUBLE_V);
}
case LONG -> {
attributeKvEntry.getLongValue().ifPresent(attributeValueBuilder::setLongV);
attributeValueBuilder.setHasV(attributeKvEntry.getLongValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.LONG_V);
}
case JSON -> {
attributeKvEntry.getJsonValue().ifPresent(attributeValueBuilder::setJsonV);
attributeValueBuilder.setHasV(attributeKvEntry.getJsonValue().isPresent());
attributeValueBuilder.setType(TransportProtos.KeyValueType.JSON_V);
}
}
builder.addValues(attributeValueBuilder.build());
} }
} }
return builder.build(); return builder.build();
} }
public static TransportProtos.AttributeValueProto toProto(AttributeKvEntry attributeKvEntry) {
TransportProtos.AttributeValueProto.Builder builder = TransportProtos.AttributeValueProto.newBuilder()
.setLastUpdateTs(attributeKvEntry.getLastUpdateTs())
.setKey(attributeKvEntry.getKey());
switch (attributeKvEntry.getDataType()) {
case BOOLEAN:
attributeKvEntry.getBooleanValue().ifPresent(builder::setBoolV);
builder.setHasV(attributeKvEntry.getBooleanValue().isPresent());
builder.setType(TransportProtos.KeyValueType.BOOLEAN_V);
break;
case STRING:
attributeKvEntry.getStrValue().ifPresent(builder::setStringV);
builder.setHasV(attributeKvEntry.getStrValue().isPresent());
builder.setType(TransportProtos.KeyValueType.STRING_V);
break;
case DOUBLE:
attributeKvEntry.getDoubleValue().ifPresent(builder::setDoubleV);
builder.setHasV(attributeKvEntry.getDoubleValue().isPresent());
builder.setType(TransportProtos.KeyValueType.DOUBLE_V);
break;
case LONG:
attributeKvEntry.getLongValue().ifPresent(builder::setLongV);
builder.setHasV(attributeKvEntry.getLongValue().isPresent());
builder.setType(TransportProtos.KeyValueType.LONG_V);
break;
case JSON:
attributeKvEntry.getJsonValue().ifPresent(builder::setJsonV);
builder.setHasV(attributeKvEntry.getJsonValue().isPresent());
builder.setType(TransportProtos.KeyValueType.JSON_V);
break;
}
if (attributeKvEntry.getVersion() != null) {
builder.setVersion(attributeKvEntry.getVersion());
}
return builder.build();
}
private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) { private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) {
return new DeviceAttributesEventNotificationMsg( return new DeviceAttributesEventNotificationMsg(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
@ -500,20 +511,25 @@ public class ProtoUtils {
} }
List<AttributeKvEntry> result = new ArrayList<>(); List<AttributeKvEntry> result = new ArrayList<>();
for (TransportProtos.AttributeValueProto kvEntry : valuesList) { for (TransportProtos.AttributeValueProto kvEntry : valuesList) {
boolean hasValue = kvEntry.getHasV(); result.add(fromProto(kvEntry));
KvEntry entry = switch (kvEntry.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null);
case LONG_V -> new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null);
case STRING_V -> new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null);
case JSON_V -> new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null);
default -> null;
};
result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry));
} }
return result; return result;
} }
public static AttributeKvEntry fromProto(TransportProtos.AttributeValueProto proto) {
boolean hasValue = proto.getHasV();
String key = proto.getKey();
KvEntry entry = switch (proto.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(key, hasValue ? proto.getBoolV() : null);
case LONG_V -> new LongDataEntry(key, hasValue ? proto.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(key, hasValue ? proto.getDoubleV() : null);
case STRING_V -> new StringDataEntry(key, hasValue ? proto.getStringV() : null);
case JSON_V -> new JsonDataEntry(key, hasValue ? proto.getJsonV() : null);
default -> null;
};
return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.hasVersion() ? proto.getVersion() : null);
}
public static TransportProtos.DeviceProto toProto(Device device) { public static TransportProtos.DeviceProto toProto(Device device) {
var builder = TransportProtos.DeviceProto.newBuilder() var builder = TransportProtos.DeviceProto.newBuilder()
.setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())

View File

@ -157,11 +157,13 @@ message AttributeValueProto {
string string_v = 7; string string_v = 7;
string json_v = 8; string json_v = 8;
optional string key = 9; optional string key = 9;
optional int64 version = 10;
} }
message TsKvProto { message TsKvProto {
int64 ts = 1; int64 ts = 1;
KeyValueProto kv = 2; KeyValueProto kv = 2;
optional int64 version = 3;
} }
message TsKvListProto { message TsKvListProto {

View File

@ -26,15 +26,8 @@ import org.thingsboard.server.cache.TbRedisSerializer;
import org.thingsboard.server.cache.VersionedRedisTbCache; import org.thingsboard.server.cache.VersionedRedisTbCache;
import org.thingsboard.server.common.data.CacheConstants; import org.thingsboard.server.common.data.CacheConstants;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") @ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
@Service("AttributeCache") @Service("AttributeCache")
@ -44,54 +37,13 @@ public class AttributeRedisCache extends VersionedRedisTbCache<AttributeCacheKey
super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { super(CacheConstants.ATTRIBUTES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {
@Override @Override
public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException { public byte[] serialize(AttributeKvEntry attributeKvEntry) throws SerializationException {
AttributeValueProto.Builder builder = AttributeValueProto.newBuilder() return ProtoUtils.toProto(attributeKvEntry).toByteArray();
.setLastUpdateTs(attributeKvEntry.getLastUpdateTs());
switch (attributeKvEntry.getDataType()) {
case BOOLEAN:
attributeKvEntry.getBooleanValue().ifPresent(builder::setBoolV);
builder.setHasV(attributeKvEntry.getBooleanValue().isPresent());
builder.setType(KeyValueType.BOOLEAN_V);
break;
case STRING:
attributeKvEntry.getStrValue().ifPresent(builder::setStringV);
builder.setHasV(attributeKvEntry.getStrValue().isPresent());
builder.setType(KeyValueType.STRING_V);
break;
case DOUBLE:
attributeKvEntry.getDoubleValue().ifPresent(builder::setDoubleV);
builder.setHasV(attributeKvEntry.getDoubleValue().isPresent());
builder.setType(KeyValueType.DOUBLE_V);
break;
case LONG:
attributeKvEntry.getLongValue().ifPresent(builder::setLongV);
builder.setHasV(attributeKvEntry.getLongValue().isPresent());
builder.setType(KeyValueType.LONG_V);
break;
case JSON:
attributeKvEntry.getJsonValue().ifPresent(builder::setJsonV);
builder.setHasV(attributeKvEntry.getJsonValue().isPresent());
builder.setType(KeyValueType.JSON_V);
break;
}
return builder.build().toByteArray();
} }
@Override @Override
public AttributeKvEntry deserialize(AttributeCacheKey key, byte[] bytes) throws SerializationException { public AttributeKvEntry deserialize(AttributeCacheKey key, byte[] bytes) throws SerializationException {
try { try {
AttributeValueProto proto = AttributeValueProto.parseFrom(bytes); return ProtoUtils.fromProto(AttributeValueProto.parseFrom(bytes));
boolean hasValue = proto.getHasV();
KvEntry entry = switch (proto.getType()) {
case BOOLEAN_V -> new BooleanDataEntry(key.getKey(), hasValue ? proto.getBoolV() : null);
case LONG_V -> new LongDataEntry(key.getKey(), hasValue ? proto.getLongV() : null);
case DOUBLE_V -> new DoubleDataEntry(key.getKey(), hasValue ? proto.getDoubleV() : null);
case STRING_V -> new StringDataEntry(key.getKey(), hasValue ? proto.getStringV() : null);
case JSON_V -> new JsonDataEntry(key.getKey(), hasValue ? proto.getJsonV() : null);
default ->
throw new InvalidProtocolBufferException("Unrecognized type: " + proto.getType() + " !");
};
return new BaseAttributeKvEntry(proto.getLastUpdateTs(), entry);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new SerializationException(e.getMessage()); throw new SerializationException(e.getMessage());
} }

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsFactory;
@ -159,7 +160,7 @@ public class CachedAttributesService implements AttributesService {
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys); log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
for (AttributeKvEntry foundInDbAttribute : result) { for (AttributeKvEntry foundInDbAttribute : result) {
put(entityId, scope, foundInDbAttribute, foundInDbAttribute.getVersion()); put(entityId, scope, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
} }
for (String key : notFoundAttributeKeys) { for (String key : notFoundAttributeKeys) {
@ -218,8 +219,7 @@ public class CachedAttributesService implements AttributesService {
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope); validate(entityId, scope);
AttributeUtils.validate(attribute, valueNoXssValidation); AttributeUtils.validate(attribute, valueNoXssValidation);
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute); return doSave(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor);
} }
@Override @Override
@ -234,19 +234,25 @@ public class CachedAttributesService implements AttributesService {
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size()); List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) { for (var attribute : attributes) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute); futures.add(doSave(tenantId, entityId, scope, attribute));
futures.add(Futures.transform(future, version -> put(entityId, scope, attribute, version), cacheExecutor));
} }
return Futures.allAsList(futures); return Futures.allAsList(futures);
} }
private Long put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute, Long version) { private ListenableFuture<Long> doSave(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
ListenableFuture<Long> future = attributesDao.save(tenantId, entityId, scope, attribute);
return Futures.transform(future, version -> {
put(entityId, scope, new BaseAttributeKvEntry(((BaseAttributeKvEntry)attribute).getKv(), attribute.getLastUpdateTs(), version));
return version;
}, cacheExecutor);
}
private void put(EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
String key = attribute.getKey(); String key = attribute.getKey();
log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute); log.trace("[{}][{}][{}] Before cache put: {}", entityId, scope, key, attribute);
cache.put(new AttributeCacheKey(scope, entityId, key), attribute, version); cache.put(new AttributeCacheKey(scope, entityId, key), attribute);
log.trace("[{}][{}][{}] after cache put.", entityId, scope, key); log.trace("[{}][{}][{}] after cache put.", entityId, scope, key);
return version;
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
@ -66,9 +67,9 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
@Override @Override
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
ListenableFuture<Long> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry); ListenableFuture<Long> future = sqlDao.saveLatest(tenantId, entityId, tsKvEntry);
future = Futures.transform(future, x -> { future = Futures.transform(future, version -> {
cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), tsKvEntry, x); cache.put(new TsLatestCacheKey(entityId, tsKvEntry.getKey()), new BasicTsKvEntry(tsKvEntry.getTs(), ((BasicTsKvEntry) tsKvEntry).getKv(), version));
return x; return version;
}, },
cacheExecutorService); cacheExecutorService);
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
@ -94,8 +95,9 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
if (x.isRemoved()) { if (x.isRemoved()) {
TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey()); TsLatestCacheKey key = new TsLatestCacheKey(entityId, query.getKey());
Long version = x.getVersion(); Long version = x.getVersion();
if (x.getData() != null) { TsKvEntry newTsKvEntry = x.getData();
cache.put(key, x.getData(), version); if (newTsKvEntry != null) {
cache.put(key, new BasicTsKvEntry(newTsKvEntry.getTs(), ((BasicTsKvEntry) newTsKvEntry).getKv(), version));
} else { } else {
cache.evict(key, version); cache.evict(key, version);
} }

View File

@ -39,7 +39,7 @@ public class TsLatestRedisCache extends VersionedRedisTbCache<TsLatestCacheKey,
super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() { super(CacheConstants.TS_LATEST_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<>() {
@Override @Override
public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException { public byte[] serialize(TsKvEntry tsKvEntry) throws SerializationException {
return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry).toByteArray(); return KvProtoUtil.toTsKvProto(tsKvEntry.getTs(), tsKvEntry, tsKvEntry.getVersion()).toByteArray();
} }
@Override @Override

View File

@ -26,7 +26,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -34,7 +33,6 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.service.AbstractServiceTest; import org.thingsboard.server.dao.service.AbstractServiceTest;
@ -74,7 +72,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attr)).get(); attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attr)).get();
Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attr.getKey()).get(); Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attr.getKey()).get();
Assert.assertTrue(saved.isPresent()); Assert.assertTrue(saved.isPresent());
Assert.assertEquals(attr, saved.get()); equalsIgnoreVersion(attr, saved.get());
} }
@Test @Test
@ -87,14 +85,15 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get(); Optional<AttributeKvEntry> saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get();
Assert.assertTrue(saved.isPresent()); Assert.assertTrue(saved.isPresent());
Assert.assertEquals(attrOld, saved.get()); equalsIgnoreVersion(attrOld, saved.get());
KvEntry attrNewValue = new StringDataEntry("attribute1", "value2"); KvEntry attrNewValue = new StringDataEntry("attribute1", "value2");
AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L); AttributeKvEntry attrNew = new BaseAttributeKvEntry(attrNewValue, 73L);
attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attrNew)).get(); attributesService.save(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, Collections.singletonList(attrNew)).get();
saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get(); saved = attributesService.find(SYSTEM_TENANT_ID, deviceId, AttributeScope.CLIENT_SCOPE, attrOld.getKey()).get();
Assert.assertEquals(attrNew, saved.get()); Assert.assertTrue(saved.isPresent());
equalsIgnoreVersion(attrNew, saved.get());
} }
@Test @Test
@ -117,8 +116,8 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
Assert.assertNotNull(saved); Assert.assertNotNull(saved);
Assert.assertEquals(2, saved.size()); Assert.assertEquals(2, saved.size());
Assert.assertEquals(attrANew, saved.get(0)); equalsIgnoreVersion(attrANew, saved.get(0));
Assert.assertEquals(attrBNew, saved.get(1)); equalsIgnoreVersion(attrBNew, saved.get(1));
} }
@Test @Test
@ -253,6 +252,11 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
})); }));
futures.add(pool.submit(() -> saveAttribute(tenantId, deviceId, scope, key, NEW_VALUE))); futures.add(pool.submit(() -> saveAttribute(tenantId, deviceId, scope, key, NEW_VALUE)));
Futures.allAsList(futures).get(10, TimeUnit.SECONDS); Futures.allAsList(futures).get(10, TimeUnit.SECONDS);
String attributeValue = getAttributeValue(tenantId, deviceId, scope, key);
if (!NEW_VALUE.equals(attributeValue)) {
System.out.println();
}
Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key)); Assert.assertEquals(NEW_VALUE, getAttributeValue(tenantId, deviceId, scope, key));
} }
@ -309,5 +313,10 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
} }
} }
private void equalsIgnoreVersion(AttributeKvEntry expected, AttributeKvEntry actual) {
Assert.assertEquals(expected.getKey(), actual.getKey());
Assert.assertEquals(expected.getValue(), actual.getValue());
Assert.assertEquals(expected.getLastUpdateTs(), actual.getLastUpdateTs());
}
} }

View File

@ -0,0 +1,114 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.service.attributes.sql;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.attributes.AttributeCacheKey;
import org.thingsboard.server.dao.service.AbstractServiceTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@DaoSqlTest
public class AttributeCacheServiceSqlTest extends AbstractServiceTest {
private static final String TEST_KEY = "key";
private static final String TEST_VALUE = "value";
private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID());
@Autowired
VersionedTbCache<AttributeCacheKey, AttributeKvEntry> cache;
@Test
public void testPutAndGet() {
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY);
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L);
cache.put(testKey, testValue);
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue, wrapper.get());
AttributeKvEntry testValue2 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 2L);
cache.put(testKey, testValue2);
wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue2, wrapper.get());
AttributeKvEntry testValue3 = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 0L);
cache.put(testKey, testValue3);
wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue2, wrapper.get());
cache.evict(testKey);
}
@Test
public void testEvictWithVersion() {
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY);
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L);
cache.put(testKey, testValue);
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue, wrapper.get());
cache.evict(testKey, 2L);
wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertNull(wrapper.get());
cache.evict(testKey);
}
@Test
public void testEvict() {
AttributeCacheKey testKey = new AttributeCacheKey(AttributeScope.CLIENT_SCOPE, DEVICE_ID, TEST_KEY);
AttributeKvEntry testValue = new BaseAttributeKvEntry(new StringDataEntry(TEST_KEY, TEST_VALUE), 1, 1L);
cache.put(testKey, testValue);
TbCacheValueWrapper<AttributeKvEntry> wrapper = cache.get(testKey);
assertNotNull(wrapper);
assertEquals(testValue, wrapper.get());
cache.evict(testKey);
wrapper = cache.get(testKey);
assertNull(wrapper);
}
}

View File

@ -137,7 +137,12 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
toTsEntry(TS, booleanKvEntry)); toTsEntry(TS, booleanKvEntry));
Collections.sort(expected, Comparator.comparing(KvEntry::getKey)); Collections.sort(expected, Comparator.comparing(KvEntry::getKey));
assertEquals(expected, tsList); for (int i = 0; i < expected.size(); i++) {
var expectedEntry = expected.get(i);
var actualEntry = tsList.get(i);
equalsIgnoreVersion(expectedEntry, actualEntry);
}
} }
private EntityView saveAndCreateEntityView(DeviceId deviceId, List<String> timeseries) { private EntityView saveAndCreateEntityView(DeviceId deviceId, List<String> timeseries) {
@ -160,7 +165,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(1, entries.size()); Assert.assertEquals(1, entries.size());
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entries.get(0));
} }
@Test @Test
@ -176,7 +181,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent(); assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.orElse(null)); equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entryOpt.get());
} }
@Test @Test
@ -186,7 +191,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent(); assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.orElse(null)); equalsIgnoreVersion(toTsEntry(TS, new StringDataEntry(STRING_KEY, "new")), entryOpt.get());
} }
public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception { public void testFindLatestOpt_givenSaveWithSameTSOverwriteTypeAndValue() throws Exception {
@ -209,7 +214,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS); Optional<TsKvEntry> entryOpt = tsService.findLatest(tenantId, deviceId, STRING_KEY).get(MAX_TIMEOUT, TimeUnit.SECONDS);
assertThat(entryOpt).isNotNull().isPresent(); assertThat(entryOpt).isNotNull().isPresent();
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entryOpt.get()); equalsIgnoreVersion(toTsEntry(TS, stringKvEntry), entryOpt.get());
} }
@Test @Test
@ -239,7 +244,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS); List<TsKvEntry> entries = tsService.findLatest(tenantId, deviceId, Collections.singleton(STRING_KEY)).get(MAX_TIMEOUT, TimeUnit.SECONDS);
Assert.assertEquals(1, entries.size()); Assert.assertEquals(1, entries.size());
Assert.assertEquals(toTsEntry(TS - 1, stringKvEntry), entries.get(0)); equalsIgnoreVersion(toTsEntry(TS - 1, stringKvEntry), entries.get(0));
} }
@Test @Test
@ -794,5 +799,10 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
return new BasicTsKvEntry(ts, entry); return new BasicTsKvEntry(ts, entry);
} }
private static void equalsIgnoreVersion(TsKvEntry expected, TsKvEntry actual) {
assertEquals(expected.getKey(), actual.getKey());
assertEquals(expected.getValue(), actual.getValue());
assertEquals(expected.getTs(), actual.getTs());
}
} }