From f9bf73f896531ca4ab5f6c4dc5dc6ba74b5fb285 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 26 Jan 2023 14:38:21 +0200 Subject: [PATCH] sparkplug: Test Telemetry comment3 adtests arrays --- .../AbstractMqttV5ClientSparkplugTest.java | 65 ++---- ...actMqttV5ClientSparkplugTelemetryTest.java | 219 +++++++++++++++--- .../MqttV5ClientSparkplugBTelemetryTest.java | 5 + .../util/sparkplug/SparkplugMetricUtil.java | 120 ++++++---- 4 files changed, 281 insertions(+), 128 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index 879629005c..bef5bf4770 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -39,7 +39,6 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.Calendar; -import java.util.Date; import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; @@ -122,40 +121,40 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return metric.toBuilder().setBytesValue(byteString).build(); case Int16Array: case UInt8Array: - byte[] int16Array = shortToByte_ByteBuffer_Method((short[]) value); + byte[] int16Array = shortArrayToByteArray((short[]) value); ByteString byteInt16Array = ByteString.copyFrom((int16Array)); return metric.toBuilder().setBytesValue(byteInt16Array).build(); case Int32Array: case UInt16Array: - byte[] int32Array = integerToByte_ByteBuffer_Method((int[]) value); - ByteString byteInt32Array = ByteString.copyFrom((int32Array)); - return metric.toBuilder().setBytesValue(byteInt32Array).build(); case Int64Array: case UInt32Array: - byte[] int64Array = longToByte_ByteBuffer_Method((long[]) value); - ByteString byteInt64Array = ByteString.copyFrom((int64Array)); - return metric.toBuilder().setBytesValue(byteInt64Array).build(); case UInt64Array: + case DateTimeArray: + if (value instanceof int[]) { + byte[] int32Array = integerArrayToByteArray((int[]) value); + ByteString byteInt32Array = ByteString.copyFrom((int32Array)); + return metric.toBuilder().setBytesValue(byteInt32Array).build(); + } else { + byte[] int64Array = longArrayToByteArray((long[]) value); + ByteString byteInt64Array = ByteString.copyFrom((int64Array)); + return metric.toBuilder().setBytesValue(byteInt64Array).build(); + } case DoubleArray: - byte[] doubleArray = doubleToByte_ByteBuffer_Method((double[]) value); + byte[] doubleArray = doublArrayToByteArray((double[]) value); ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); return metric.toBuilder().setBytesValue(byteDoubleArray).build(); case FloatArray: - byte[] floatArray = floatToByte_ByteBuffer_Method((float[]) value); + byte[] floatArray = floatArrayToByteArray((float[]) value); ByteString byteFloatArray = ByteString.copyFrom(floatArray); return metric.toBuilder().setBytesValue(byteFloatArray).build(); case BooleanArray: - byte[] booleanArray = booleanToByte_ByteBuffer_Method((boolean[]) value); + byte[] booleanArray = booleanArrayToByteArray((boolean[]) value); ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); return metric.toBuilder().setBytesValue(byteBooleanArray).build(); case StringArray: - byte[] stringArray = stringToByte_ByteBuffer_Method((String[]) value); + byte[] stringArray = stringArrayToByteArray((String[]) value); ByteString byteStringArray = ByteString.copyFrom(stringArray); return metric.toBuilder().setBytesValue(byteStringArray).build(); - case DateTimeArray: - byte[] dateArray = dateToByte_ByteBuffer_Method((Date[]) value); - ByteString byteDateArray = ByteString.copyFrom(dateArray); - return metric.toBuilder().setBytesValue(byteDateArray).build(); case File: SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value; ByteString byteFileString = ByteString.copyFrom(file.getBytes()); @@ -168,7 +167,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return metric; } - private byte[] shortToByte_ByteBuffer_Method(short[] inputs) { + private byte[] shortArrayToByteArray(short[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2); for (short d : inputs) { bb.putShort(d); @@ -176,15 +175,15 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return bb.array(); } - private byte[] integerToByte_ByteBuffer_Method(int[] inputs) { + private byte[] integerArrayToByteArray(int[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4); for (int d : inputs) { - bb.putLong(d); + bb.putInt(d); } return bb.array(); } - private byte[] longToByte_ByteBuffer_Method(long[] inputs) { + private byte[] longArrayToByteArray(long[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); for (long d : inputs) { bb.putLong(d); @@ -192,7 +191,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return bb.array(); } - private byte[] doubleToByte_ByteBuffer_Method(double[] inputs) { + private byte[] doublArrayToByteArray(double[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); for (double d : inputs) { bb.putDouble(d); @@ -200,7 +199,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return bb.array(); } - private byte[] floatToByte_ByteBuffer_Method(float[] inputs) throws ThingsboardException { + private byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException { ByteArrayOutputStream bas = new ByteArrayOutputStream(); DataOutputStream ds = new DataOutputStream(bas); for (float f : inputs) { @@ -213,19 +212,15 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return bas.toByteArray(); } - private byte[] booleanToByte_ByteBuffer_Method(boolean[] inputs) { - byte[] toReturn = new byte[inputs.length / 8]; + private byte[] booleanArrayToByteArray(boolean[] inputs) { + byte[] toReturn = new byte[inputs.length]; for (int entry = 0; entry < toReturn.length; entry++) { - for (int bit = 0; bit < 8; bit++) { - if (inputs[entry * 8 + bit]) { - toReturn[entry] |= (128 >> bit); - } - } + toReturn[entry] = (byte) (inputs[entry]?1:0); } return toReturn; } - private byte[] stringToByte_ByteBuffer_Method(String[] inputs) throws ThingsboardException { + private byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException { final ByteArrayOutputStream bas = new ByteArrayOutputStream(); try { final ObjectOutputStream os = new ObjectOutputStream(bas); @@ -238,16 +233,6 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return bas.toByteArray(); } - private byte[] dateToByte_ByteBuffer_Method(Date[] inputs) { - long[] ll = new long[inputs.length]; - int i = 0; - for (Date date : inputs) { - ll[i] = date.getTime(); - i++; - } - return longToByte_ByteBuffer_Method(ll); - } - private MqttWireMessage clientWithCorrectNodeAccessToken(MqttV5TestClient client) throws Exception { IMqttToken connectionResult = client.connectAndWait(gatewayAccessToken); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index 82bf6d52d6..e2892aaa3a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -15,13 +15,11 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug.timeseries; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.thingsboard.server.common.data.exception.ThingsboardException; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; @@ -37,20 +35,35 @@ import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; +import static org.thingsboard.common.util.JacksonUtil.newArrayNode; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.BooleanArray; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.DateTimeArray; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.DoubleArray; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.FloatArray; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int8; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int8Array; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.StringArray; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt16; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt16Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8Array; /** * Created by nickAS21 on 12.01.23 @@ -134,7 +147,6 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { processClientWithCorrectNodeAccess(); - String deviceName = deviceId + "_" + 10; String messageTypeName = SparkplugMessageType.NDATA.name(); List listKeys = new ArrayList<>(); List listTsKvEntry = new ArrayList<>(); @@ -144,7 +156,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac .setSeq(getSeqNum()); long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; - createdAddMetricTsKv(listTsKvEntry, listKeys, ndataPayload, ts); + createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); if (client.isConnected()) { client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, @@ -152,7 +164,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac } AtomicReference>> finalFuture = new AtomicReference<>(); - await(alias + SparkplugMessageType.NCMD.name()) + await(alias + SparkplugMessageType.NDATA.name()) .atMost(40, TimeUnit.SECONDS) .until(() -> { finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); @@ -162,8 +174,38 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); } - private void createdAddMetricTsKv(List listTsKvEntry, List listKeys, - SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { + protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysSimple() throws Exception { + processClientWithCorrectNodeAccess(); + + String messageTypeName = SparkplugMessageType.NDATA.name(); + List listKeys = new ArrayList<>(); + List listTsKvEntry = new ArrayList<>(); + + SparkplugBProto.Payload.Builder ndataPayload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(calendar.getTimeInMillis()) + .setSeq(getSeqNum()); + long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; + + createdAddMetricValueArraysTsKv(listTsKvEntry, listKeys, ndataPayload, ts); + + if (client.isConnected()) { + client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, + ndataPayload.build().toByteArray(), 0, false); + } + + AtomicReference>> finalFuture = new AtomicReference<>(); + await(alias + SparkplugMessageType.NDATA.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); + return finalFuture.get().get().size() == listTsKvEntry.size(); + }); + Assert.assertTrue("Expected tsKvEntrys is not equal Actual tsKvEntrys", listTsKvEntry.containsAll(finalFuture.get().get())); + Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); + } + + private void createdAddMetricValuePrimitiveTsKv(List listTsKvEntry, List listKeys, + SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { String keys = "MyInt8"; listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt8(), ts, Int8)); @@ -206,7 +248,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac listKeys.add(keys); keys = "MyDateTime"; - listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, ts, ts, MetricDataType.DateTime)); + listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextDateTime(), ts, MetricDataType.DateTime)); listKeys.add(keys); keys = "MyDouble"; @@ -231,6 +273,65 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac } + private void createdAddMetricValueArraysTsKv(List listTsKvEntry, List listKeys, + SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { + String keys = "MyBytesArray"; + byte[] bytes = {nextInt8(), nextInt8(), nextInt8()}; + createdAddMetricTsKvJson(dataPayload, keys, bytes, ts, Bytes, listTsKvEntry, listKeys); + + keys = "MyInt8Array"; + byte[] int8s = {nextInt8(), nextInt8(), nextInt8()}; + createdAddMetricTsKvJson(dataPayload, keys, int8s, ts, Int8Array, listTsKvEntry, listKeys); + + keys = "MyInt16Array"; + short[] int16s = {nextInt16(), nextInt16(), nextInt16()}; + createdAddMetricTsKvJson(dataPayload, keys, int16s, ts, Int16Array, listTsKvEntry, listKeys); + + keys = "MyInt32Array"; + int[] int32s = {nextInt32(), nextInt32(), nextInt32()}; + createdAddMetricTsKvJson(dataPayload, keys, int32s, ts, Int32Array, listTsKvEntry, listKeys); + + keys = "MyInt64Array"; + long[] int64s = {nextInt64(), nextInt64(), nextInt64()}; + createdAddMetricTsKvJson(dataPayload, keys, int64s, ts, Int64Array, listTsKvEntry, listKeys); + + keys = "MyUInt8Array"; + short[] uInt8s = {nextUInt16(), nextUInt16(), nextUInt16()}; + createdAddMetricTsKvJson(dataPayload, keys, uInt8s, ts, UInt8Array, listTsKvEntry, listKeys); + + keys = "MyUInt16Array"; + int[] uInt16s = {nextUInt16(), nextUInt16(), nextUInt16()}; + createdAddMetricTsKvJson(dataPayload, keys, uInt16s, ts, UInt16Array, listTsKvEntry, listKeys); + + keys = "MyUInt32LArray"; + long[] uInt32Ls = {nextUInt32L(), nextUInt32L(), nextUInt32L()}; + createdAddMetricTsKvJson(dataPayload, keys, uInt32Ls, ts, UInt32Array, listTsKvEntry, listKeys); + + keys = "MyUInt64Array"; + long[] uInt64s = {nextUInt64(), nextUInt64(), nextUInt64()}; + createdAddMetricTsKvJson(dataPayload, keys, uInt64s, ts, UInt64Array, listTsKvEntry, listKeys); + + keys = "MyFloatArray"; + float[] floats = {nextFloat(0,300), nextFloat(0,4000), nextFloat(10,10000)}; + createdAddMetricTsKvJson(dataPayload, keys, floats, ts, FloatArray, listTsKvEntry, listKeys); + + keys = "MyDateTimeArray"; + long[] dateTimes = {nextDateTime(), nextDateTime(), nextDateTime()}; + createdAddMetricTsKvJson(dataPayload, keys, dateTimes, ts, DateTimeArray, listTsKvEntry, listKeys); + + keys = "MyDoubleArray"; + double [] doubles = {nextDouble(), nextDouble(), nextDouble()}; + createdAddMetricTsKvJson(dataPayload, keys, doubles, ts, DoubleArray, listTsKvEntry, listKeys); + + keys = "MyBooleanArray"; + boolean [] booleans = {nextBoolean(), nextBoolean(), nextBoolean()}; + createdAddMetricTsKvJson(dataPayload, keys, booleans, ts, BooleanArray, listTsKvEntry, listKeys); + + keys = "MyStringArray"; + String [] strings = {nexString(), nexString(), nexString()}; + createdAddMetricTsKvJson(dataPayload, keys, strings, ts, StringArray, listTsKvEntry, listKeys); + } + private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value, long ts, MetricDataType metricDataType) throws ThingsboardException { TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value)))); @@ -269,11 +370,71 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac return tsKvEntry; } - private TsKvEntry createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys, String value, - long ts, MetricDataType metricDataType) throws ThingsboardException { - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(keys, value)); - dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); - return tsKvEntry; + private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys, + Object values, long ts, MetricDataType metricDataType, + List listTsKvEntry, + List listKeys) throws ThingsboardException { + ArrayNode nodeArray = newArrayNode(); + switch (metricDataType) { + case Bytes: + case Int8Array: + for (byte b : (byte[])values) { + nodeArray.add(b); + } + break; + case Int16Array: + case UInt8Array: + for (short b : (short[])values) { + nodeArray.add(b); + } + break; + case Int32Array: + case UInt16Array: + case Int64Array: + case UInt32Array: + case UInt64Array: + case DateTimeArray: + if (values instanceof int[]) { + for (int b : (int[])values) { + nodeArray.add(b); + } + } else { + for (long b : (long[])values) { + nodeArray.add(b); + } + } + break; + case DoubleArray: + for (double b : (double[])values) { + nodeArray.add(b); + } + break; + case FloatArray: + for (float b : (float[])values) { + nodeArray.add(b); + } + break; + case BooleanArray: + for (boolean b : (boolean[])values) { + nodeArray.add(b); + } + break; + case StringArray: + for (String b : (String[])values) { + nodeArray.add(b); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + metricDataType); + } + if (nodeArray.size() > 0) { + Optional tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(keys, nodeArray.toString()))); + if (tsKvEntryOptional.isPresent()) { + dataPayload.addMetrics(createMetric(values, tsKvEntryOptional.get(), metricDataType)); + listTsKvEntry.add(tsKvEntryOptional.get()); + listKeys.add(keys); + } + } } private byte nextInt8() { @@ -318,6 +479,12 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac return random.nextDouble(Long.MIN_VALUE, Long.MAX_VALUE); } + private long nextDateTime() { + long min = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; + long max = calendar.getTimeInMillis(); + return random.nextLong(min, max); + } + private float nextFloat(float min, float max) { if (min >= max) throw new IllegalArgumentException("max must be greater than min"); @@ -327,7 +494,6 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac return result; } - private boolean nextBoolean() { return random.nextBoolean(); } @@ -336,25 +502,4 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac return java.util.UUID.randomUUID().toString(); } - private List getActualKeysList(DeviceId deviceId, List expectedKeys, long start) throws Exception { -// long start = System.currentTimeMillis(); - long end = System.currentTimeMillis() + 3000; - - List actualKeys = null; - while (start <= end) { - actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", new TypeReference<>() { - }); - - Map> timeseries = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + - savedGateway.getId().getId() + "/values/timeseries?keys=" + expectedKeys.get(6), new TypeReference<>() { - }); - if (actualKeys.size() == expectedKeys.size()) { - break; - } - Thread.sleep(300); - start += 100; - } - return actualKeys; - } - } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java index 63aec2dba2..917d1a9f65 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/MqttV5ClientSparkplugBTelemetryTest.java @@ -53,4 +53,9 @@ public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSpa processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple(); } + @Test + public void testClientWithCorrectAccessTokenPushNodeMetricBuildPArraysSimple() throws Exception { + processClientWithCorrectAccessTokenPushNodeMetricBuildArraysSimple(); + } + } \ No newline at end of file diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index e03f2a0664..c24d8a5d36 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -25,15 +25,19 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.nio.ByteOrder; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.ShortBuffer; import java.util.Arrays; import java.util.Optional; import static org.thingsboard.common.util.JacksonUtil.newArrayNode; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.BooleanArray; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; /** * Provides utility methods for SparkplugB MQTT Payload Metric. @@ -91,42 +95,85 @@ public class SparkplugMetricUtil { case UUID: return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) .setStringV(protoMetric.getStringValue()).build()); - case Bytes: + // byte[] case BooleanArray: + ByteBuffer booleanByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); + while (booleanByteBuffer.hasRemaining()){ + nodeArray.add(booleanByteBuffer.get() == (byte) 0 ? false : true); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + // byte[] + case Bytes: case Int8Array: + ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); + while (byteBuffer.hasRemaining()){ + nodeArray.add(byteBuffer.get()); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + // short[] case Int16Array: - case Int32Array: case UInt8Array: + ShortBuffer shortByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) + .asShortBuffer(); + while (shortByteBuffer.hasRemaining()){ + nodeArray.add(shortByteBuffer.get()); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + // int[] + case Int32Array: case UInt16Array: + IntBuffer intByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) + .asIntBuffer(); + while (intByteBuffer.hasRemaining()){ + nodeArray.add(intByteBuffer.get()); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + // float[] case FloatArray: + FloatBuffer floatByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) + .asFloatBuffer(); + while (floatByteBuffer.hasRemaining()){ + nodeArray.add(floatByteBuffer.get()); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + // double[] case DoubleArray: + DoubleBuffer doubleByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) + .asDoubleBuffer(); + while (doubleByteBuffer.hasRemaining()){ + nodeArray.add(doubleByteBuffer.get()); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + // long[] case DateTimeArray: case Int64Array: case UInt64Array: case UInt32Array: - ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) { - byteBuffer.order(ByteOrder.LITTLE_ENDIAN); - } - while (byteBuffer.hasRemaining()) { - setValueToNodeArray(nodeArray, byteBuffer, metricType); + LongBuffer longByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) + .asLongBuffer(); + while (longByteBuffer.hasRemaining()){ + nodeArray.add(longByteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) .setJsonV(nodeArray.toString()).build()); case StringArray: ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN); - StringBuilder sb = new StringBuilder(); - while (stringByteBuffer.hasRemaining()) { - byte b = stringByteBuffer.get(); - if (b == (byte) 0) { - nodeArray.add(sb.toString()); - sb = new StringBuilder(); - } else { - sb.append((char) b); - } + final ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(stringByteBuffer.array()); + final ObjectInputStream objectInputStream = + new ObjectInputStream(byteArrayInputStream); + final String[] stringArray = (String[]) objectInputStream.readObject(); + objectInputStream.close(); + for (String s: stringArray) { + nodeArray.add(s); } - return Optional.of(builderProto.setKey(key) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) .setJsonV(nodeArray.toString()).build()); case DataSet: case Template: @@ -167,35 +214,6 @@ public class SparkplugMetricUtil { } } - private static void setValueToNodeArray(ArrayNode nodeArray, ByteBuffer byteBuffer, int metricType) { - switch (MetricDataType.fromInteger(metricType)) { - case Bytes: - nodeArray.add(byteBuffer.get()); - break; - case BooleanArray: - nodeArray.add(byteBuffer.get() == (byte) 0 ? "false" : "true"); - break; - case Int8Array: - case Int16Array: - case Int32Array: - case UInt8Array: - case UInt16Array: - nodeArray.add(byteBuffer.getInt()); - break; - case FloatArray: - nodeArray.add(byteBuffer.getFloat()); - break; - case DoubleArray: - nodeArray.add(byteBuffer.getDouble()); - break; - case DateTimeArray: - case Int64Array: - case UInt64Array: - case UInt32Array: - nodeArray.add(byteBuffer.getLong()); - } - } - @JsonIgnoreProperties( value = {"fileName"}) @JsonSerialize(