From ee1209b19f4e842aff42a07b81d9c579574d2f32 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Fri, 22 Mar 2024 12:01:36 +0200 Subject: [PATCH] Fix KvProtoUtils order for matching KeyValueType and DataType --- .../device/DeviceActorMessageProcessor.java | 5 +- .../queue/DefaultTbCoreConsumerService.java | 8 +- .../server/common/data/EntityType.java | 3 +- .../server/common/data/kv/DataType.java | 15 +- .../server/common/util/KvProtoUtil.java | 130 +++++++---------- .../server/common/util/KvProtoUtilTest.java | 131 ++++++++++++++++++ 6 files changed, 203 insertions(+), 89 deletions(-) create mode 100644 common/proto/src/test/java/org/thingsboard/server/common/util/KvProtoUtilTest.java diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index d6b5da4ddf..d5fc8055d3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.common.util.JacksonUtil; @@ -68,7 +69,6 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.common.util.KvProtoUtil; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; @@ -97,7 +97,6 @@ import org.thingsboard.server.service.rpc.RpcSubmitStrategy; import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; -import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -605,7 +604,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) { List attributes = new ArrayList<>(msg.getValues()); if (attributes.size() > 0) { - List sharedUpdated = msg.getValues().stream().map(t -> KvProtoUtil.toTsKvProto(t.getLastUpdateTs(), t)) + List sharedUpdated = msg.getValues().stream().map(t -> KvProtoUtil.toProto(t.getLastUpdateTs(), t)) .collect(Collectors.toList()); if (!sharedUpdated.isEmpty()) { notification.addAllSharedUpdated(sharedUpdated); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index fd08b9da4d..fbbb65d4ef 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -18,6 +18,8 @@ package org.thingsboard.server.service.queue; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -50,9 +52,9 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.common.util.KvProtoUtil; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.resource.ImageCacheKey; -import org.thingsboard.server.common.util.KvProtoUtil; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; @@ -101,8 +103,6 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -583,7 +583,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService dataTypeByProtoNumber[dataType.getProtoNumber()] = dataType); + } + + public static List toAttributeKvList(List dataList) { + List result = new ArrayList<>(dataList.size()); + dataList.forEach(proto -> result.add(new BaseAttributeKvEntry(fromProto(proto.getKv()), proto.getTs()))); + return result; + } + public static List attrToTsKvProtos(List result) { List clientAttributes; if (result == null || result.isEmpty()) { @@ -41,115 +56,70 @@ public class KvProtoUtil { } else { clientAttributes = new ArrayList<>(result.size()); for (AttributeKvEntry attrEntry : result) { - clientAttributes.add(toTsKvProto(attrEntry.getLastUpdateTs(), attrEntry)); + clientAttributes.add(toProto(attrEntry.getLastUpdateTs(), attrEntry)); } } return clientAttributes; } - - public static List tsToTsKvProtos(List result) { + public static List toProtoList(List result) { List ts; if (result == null || result.isEmpty()) { ts = Collections.emptyList(); } else { ts = new ArrayList<>(result.size()); for (TsKvEntry attrEntry : result) { - ts.add(toTsKvProto(attrEntry.getTs(), attrEntry)); + ts.add(toProto(attrEntry.getTs(), attrEntry)); } } return ts; } - public static TransportProtos.TsKvProto toTsKvProto(long ts, KvEntry kvEntry) { - return TransportProtos.TsKvProto.newBuilder().setTs(ts) - .setKv(KvProtoUtil.toKeyValueProto(kvEntry)).build(); + public static List fromProtoList(List dataList) { + List result = new ArrayList<>(dataList.size()); + dataList.forEach(proto -> result.add(new BasicTsKvEntry(proto.getTs(), fromProto(proto.getKv())))); + return result; } - public static TransportProtos.KeyValueProto toKeyValueProto(KvEntry kvEntry) { + public static TransportProtos.TsKvProto toProto(long ts, KvEntry kvEntry) { + return TransportProtos.TsKvProto.newBuilder().setTs(ts) + .setKv(KvProtoUtil.toProto(kvEntry)).build(); + } + + public static TsKvEntry fromProto(TransportProtos.TsKvProto proto) { + return new BasicTsKvEntry(proto.getTs(), fromProto(proto.getKv())); + } + + public static TransportProtos.KeyValueProto toProto(KvEntry kvEntry) { TransportProtos.KeyValueProto.Builder builder = TransportProtos.KeyValueProto.newBuilder(); builder.setKey(kvEntry.getKey()); + builder.setType(toProto(kvEntry.getDataType())); switch (kvEntry.getDataType()) { - case BOOLEAN: - builder.setType(TransportProtos.KeyValueType.BOOLEAN_V); - builder.setBoolV(kvEntry.getBooleanValue().get()); - break; - case DOUBLE: - builder.setType(TransportProtos.KeyValueType.DOUBLE_V); - builder.setDoubleV(kvEntry.getDoubleValue().get()); - break; - case LONG: - builder.setType(TransportProtos.KeyValueType.LONG_V); - builder.setLongV(kvEntry.getLongValue().get()); - break; - case STRING: - builder.setType(TransportProtos.KeyValueType.STRING_V); - builder.setStringV(kvEntry.getStrValue().get()); - break; - case JSON: - builder.setType(TransportProtos.KeyValueType.JSON_V); - builder.setJsonV(kvEntry.getJsonValue().get()); - break; + case BOOLEAN -> kvEntry.getBooleanValue().ifPresent(builder::setBoolV); + case LONG -> kvEntry.getLongValue().ifPresent(builder::setLongV); + case DOUBLE -> kvEntry.getDoubleValue().ifPresent(builder::setDoubleV); + case JSON -> kvEntry.getJsonValue().ifPresent(builder::setJsonV); + case STRING -> kvEntry.getStrValue().ifPresent(builder::setStringV); } return builder.build(); } - public static TransportProtos.TsKvProto.Builder toKeyValueProto(long ts, KvEntry attr) { - TransportProtos.KeyValueProto.Builder dataBuilder = TransportProtos.KeyValueProto.newBuilder(); - dataBuilder.setKey(attr.getKey()); - dataBuilder.setType(TransportProtos.KeyValueType.forNumber(attr.getDataType().ordinal())); - switch (attr.getDataType()) { - case BOOLEAN: - attr.getBooleanValue().ifPresent(dataBuilder::setBoolV); - break; - case LONG: - attr.getLongValue().ifPresent(dataBuilder::setLongV); - break; - case DOUBLE: - attr.getDoubleValue().ifPresent(dataBuilder::setDoubleV); - break; - case JSON: - attr.getJsonValue().ifPresent(dataBuilder::setJsonV); - break; - case STRING: - attr.getStrValue().ifPresent(dataBuilder::setStringV); - break; - } - return TransportProtos.TsKvProto.newBuilder().setTs(ts).setKv(dataBuilder); + public static KvEntry fromProto(TransportProtos.KeyValueProto proto) { + return switch (fromProto(proto.getType())) { + case BOOLEAN -> new BooleanDataEntry(proto.getKey(), proto.getBoolV()); + case LONG -> new LongDataEntry(proto.getKey(), proto.getLongV()); + case DOUBLE -> new DoubleDataEntry(proto.getKey(), proto.getDoubleV()); + case STRING -> new StringDataEntry(proto.getKey(), proto.getStringV()); + case JSON -> new JsonDataEntry(proto.getKey(), proto.getJsonV()); + }; } - public static List toTsKvEntityList(List dataList) { - List result = new ArrayList<>(dataList.size()); - dataList.forEach(proto -> result.add(new BasicTsKvEntry(proto.getTs(), getKvEntry(proto.getKv())))); - return result; + public static TransportProtos.KeyValueType toProto(DataType dataType) { + return TransportProtos.KeyValueType.forNumber(dataType.getProtoNumber()); } - public static List toAttributeKvList(List dataList) { - List result = new ArrayList<>(dataList.size()); - dataList.forEach(proto -> result.add(new BaseAttributeKvEntry(getKvEntry(proto.getKv()), proto.getTs()))); - return result; + public static DataType fromProto(TransportProtos.KeyValueType keyValueType) { + return dataTypeByProtoNumber[keyValueType.getNumber()]; } - private static KvEntry getKvEntry(TransportProtos.KeyValueProto proto) { - KvEntry entry = null; - DataType type = DataType.values()[proto.getType().getNumber()]; - switch (type) { - case BOOLEAN: - entry = new BooleanDataEntry(proto.getKey(), proto.getBoolV()); - break; - case LONG: - entry = new LongDataEntry(proto.getKey(), proto.getLongV()); - break; - case DOUBLE: - entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleV()); - break; - case STRING: - entry = new StringDataEntry(proto.getKey(), proto.getStringV()); - break; - case JSON: - entry = new JsonDataEntry(proto.getKey(), proto.getJsonV()); - break; - } - return entry; - } } diff --git a/common/proto/src/test/java/org/thingsboard/server/common/util/KvProtoUtilTest.java b/common/proto/src/test/java/org/thingsboard/server/common/util/KvProtoUtilTest.java new file mode 100644 index 0000000000..603d4b3aa6 --- /dev/null +++ b/common/proto/src/test/java/org/thingsboard/server/common/util/KvProtoUtilTest.java @@ -0,0 +1,131 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.util; + +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.kv.AggTsKvEntry; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +class KvProtoUtilTest { + + @Test + void protoDataTypeSerialization() { + for (DataType dataType : DataType.values()) { + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(dataType))).as(dataType.name()).isEqualTo(dataType); + } + } + + @Test + void protoKeyValueProtoSerialization() { + String key = "key"; + KvEntry kvEntry = new BooleanDataEntry(key, true); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new LongDataEntry(key, 23L); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new DoubleDataEntry(key, 23.0); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new StringDataEntry(key, "stringValue"); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new JsonDataEntry(key, "jsonValue"); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(kvEntry))).as("deserialized").isEqualTo(kvEntry); + } + + @Test + void protoTsKvEntrySerialization() { + String key = "key"; + long ts = System.currentTimeMillis(); + KvEntry kvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(key, true)); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(ts, kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, 23L)); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(ts, kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(key, 23.0)); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(ts, kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new BasicTsKvEntry(ts, new StringDataEntry(key, "stringValue")); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(ts, kvEntry))).as("deserialized").isEqualTo(kvEntry); + + kvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(key, "jsonValue")); + assertThat(KvProtoUtil.fromProto(KvProtoUtil.toProto(ts, kvEntry))).as("deserialized").isEqualTo(kvEntry); + } + + @Test + void protoListTsKvEntrySerialization() { + String key = "key"; + long ts = System.currentTimeMillis(); + KvEntry booleanDataEntry = new BooleanDataEntry(key, true); + KvEntry longDataEntry = new LongDataEntry(key, 23L); + KvEntry doubleDataEntry = new DoubleDataEntry(key, 23.0); + KvEntry stringDataEntry = new StringDataEntry(key, "stringValue"); + KvEntry jsonDataEntry = new JsonDataEntry(key, "jsonValue"); + List protoList = List.of( + new BasicTsKvEntry(ts, booleanDataEntry), + new BasicTsKvEntry(ts, longDataEntry), + new BasicTsKvEntry(ts, doubleDataEntry), + new BasicTsKvEntry(ts, stringDataEntry), + new BasicTsKvEntry(ts, jsonDataEntry) + ); + assertThat(KvProtoUtil.fromProtoList(KvProtoUtil.toProtoList(protoList))).as("deserialized").isEqualTo(protoList); + + protoList = List.of( + new AggTsKvEntry(ts, booleanDataEntry, 3), + new AggTsKvEntry(ts, longDataEntry, 5), + new AggTsKvEntry(ts, doubleDataEntry, 2), + new AggTsKvEntry(ts, stringDataEntry, 1), + new AggTsKvEntry(ts, jsonDataEntry, 0) + ); + assertThat(KvProtoUtil.fromProtoList(KvProtoUtil.toProtoList(protoList))).as("deserialized").isEqualTo(protoList); + } + + @Test + void protoListAttributeKvSerialization() { + String key = "key"; + long ts = System.currentTimeMillis(); + KvEntry booleanDataEntry = new BooleanDataEntry(key, true); + KvEntry longDataEntry = new LongDataEntry(key, 23L); + KvEntry doubleDataEntry = new DoubleDataEntry(key, 23.0); + KvEntry stringDataEntry = new StringDataEntry(key, "stringValue"); + KvEntry jsonDataEntry = new JsonDataEntry(key, "jsonValue"); + List protoList = List.of( + new BaseAttributeKvEntry(ts, booleanDataEntry), + new BaseAttributeKvEntry(ts, longDataEntry), + new BaseAttributeKvEntry(ts, doubleDataEntry), + new BaseAttributeKvEntry(ts, stringDataEntry), + new BaseAttributeKvEntry(ts, jsonDataEntry) + ); + assertThat(KvProtoUtil.toAttributeKvList(KvProtoUtil.attrToTsKvProtos(protoList))).as("deserialized").isEqualTo(protoList); + } + +}