sparkplug: Test Telemetry comment3 adtests arrays

This commit is contained in:
nickAS21 2023-01-26 14:38:21 +02:00
parent 8ecde928dc
commit f9bf73f896
4 changed files with 281 additions and 128 deletions

View File

@ -39,7 +39,6 @@ import java.io.IOException;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
@ -122,40 +121,40 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return metric.toBuilder().setBytesValue(byteString).build(); return metric.toBuilder().setBytesValue(byteString).build();
case Int16Array: case Int16Array:
case UInt8Array: case UInt8Array:
byte[] int16Array = shortToByte_ByteBuffer_Method((short[]) value); byte[] int16Array = shortArrayToByteArray((short[]) value);
ByteString byteInt16Array = ByteString.copyFrom((int16Array)); ByteString byteInt16Array = ByteString.copyFrom((int16Array));
return metric.toBuilder().setBytesValue(byteInt16Array).build(); return metric.toBuilder().setBytesValue(byteInt16Array).build();
case Int32Array: case Int32Array:
case UInt16Array: case UInt16Array:
byte[] int32Array = integerToByte_ByteBuffer_Method((int[]) value);
ByteString byteInt32Array = ByteString.copyFrom((int32Array));
return metric.toBuilder().setBytesValue(byteInt32Array).build();
case Int64Array: case Int64Array:
case UInt32Array: case UInt32Array:
byte[] int64Array = longToByte_ByteBuffer_Method((long[]) value);
ByteString byteInt64Array = ByteString.copyFrom((int64Array));
return metric.toBuilder().setBytesValue(byteInt64Array).build();
case UInt64Array: case UInt64Array:
case DateTimeArray:
if (value instanceof int[]) {
byte[] int32Array = integerArrayToByteArray((int[]) value);
ByteString byteInt32Array = ByteString.copyFrom((int32Array));
return metric.toBuilder().setBytesValue(byteInt32Array).build();
} else {
byte[] int64Array = longArrayToByteArray((long[]) value);
ByteString byteInt64Array = ByteString.copyFrom((int64Array));
return metric.toBuilder().setBytesValue(byteInt64Array).build();
}
case DoubleArray: case DoubleArray:
byte[] doubleArray = doubleToByte_ByteBuffer_Method((double[]) value); byte[] doubleArray = doublArrayToByteArray((double[]) value);
ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); ByteString byteDoubleArray = ByteString.copyFrom(doubleArray);
return metric.toBuilder().setBytesValue(byteDoubleArray).build(); return metric.toBuilder().setBytesValue(byteDoubleArray).build();
case FloatArray: case FloatArray:
byte[] floatArray = floatToByte_ByteBuffer_Method((float[]) value); byte[] floatArray = floatArrayToByteArray((float[]) value);
ByteString byteFloatArray = ByteString.copyFrom(floatArray); ByteString byteFloatArray = ByteString.copyFrom(floatArray);
return metric.toBuilder().setBytesValue(byteFloatArray).build(); return metric.toBuilder().setBytesValue(byteFloatArray).build();
case BooleanArray: case BooleanArray:
byte[] booleanArray = booleanToByte_ByteBuffer_Method((boolean[]) value); byte[] booleanArray = booleanArrayToByteArray((boolean[]) value);
ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); ByteString byteBooleanArray = ByteString.copyFrom(booleanArray);
return metric.toBuilder().setBytesValue(byteBooleanArray).build(); return metric.toBuilder().setBytesValue(byteBooleanArray).build();
case StringArray: case StringArray:
byte[] stringArray = stringToByte_ByteBuffer_Method((String[]) value); byte[] stringArray = stringArrayToByteArray((String[]) value);
ByteString byteStringArray = ByteString.copyFrom(stringArray); ByteString byteStringArray = ByteString.copyFrom(stringArray);
return metric.toBuilder().setBytesValue(byteStringArray).build(); return metric.toBuilder().setBytesValue(byteStringArray).build();
case DateTimeArray:
byte[] dateArray = dateToByte_ByteBuffer_Method((Date[]) value);
ByteString byteDateArray = ByteString.copyFrom(dateArray);
return metric.toBuilder().setBytesValue(byteDateArray).build();
case File: case File:
SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value; SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value;
ByteString byteFileString = ByteString.copyFrom(file.getBytes()); ByteString byteFileString = ByteString.copyFrom(file.getBytes());
@ -168,7 +167,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return metric; return metric;
} }
private byte[] shortToByte_ByteBuffer_Method(short[] inputs) { private byte[] shortArrayToByteArray(short[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2); ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2);
for (short d : inputs) { for (short d : inputs) {
bb.putShort(d); bb.putShort(d);
@ -176,15 +175,15 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return bb.array(); return bb.array();
} }
private byte[] integerToByte_ByteBuffer_Method(int[] inputs) { private byte[] integerArrayToByteArray(int[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4); ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4);
for (int d : inputs) { for (int d : inputs) {
bb.putLong(d); bb.putInt(d);
} }
return bb.array(); return bb.array();
} }
private byte[] longToByte_ByteBuffer_Method(long[] inputs) { private byte[] longArrayToByteArray(long[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (long d : inputs) { for (long d : inputs) {
bb.putLong(d); bb.putLong(d);
@ -192,7 +191,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return bb.array(); return bb.array();
} }
private byte[] doubleToByte_ByteBuffer_Method(double[] inputs) { private byte[] doublArrayToByteArray(double[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (double d : inputs) { for (double d : inputs) {
bb.putDouble(d); bb.putDouble(d);
@ -200,7 +199,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return bb.array(); return bb.array();
} }
private byte[] floatToByte_ByteBuffer_Method(float[] inputs) throws ThingsboardException { private byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException {
ByteArrayOutputStream bas = new ByteArrayOutputStream(); ByteArrayOutputStream bas = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bas); DataOutputStream ds = new DataOutputStream(bas);
for (float f : inputs) { for (float f : inputs) {
@ -213,19 +212,15 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return bas.toByteArray(); return bas.toByteArray();
} }
private byte[] booleanToByte_ByteBuffer_Method(boolean[] inputs) { private byte[] booleanArrayToByteArray(boolean[] inputs) {
byte[] toReturn = new byte[inputs.length / 8]; byte[] toReturn = new byte[inputs.length];
for (int entry = 0; entry < toReturn.length; entry++) { for (int entry = 0; entry < toReturn.length; entry++) {
for (int bit = 0; bit < 8; bit++) { toReturn[entry] = (byte) (inputs[entry]?1:0);
if (inputs[entry * 8 + bit]) {
toReturn[entry] |= (128 >> bit);
}
}
} }
return toReturn; return toReturn;
} }
private byte[] stringToByte_ByteBuffer_Method(String[] inputs) throws ThingsboardException { private byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException {
final ByteArrayOutputStream bas = new ByteArrayOutputStream(); final ByteArrayOutputStream bas = new ByteArrayOutputStream();
try { try {
final ObjectOutputStream os = new ObjectOutputStream(bas); final ObjectOutputStream os = new ObjectOutputStream(bas);
@ -238,16 +233,6 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return bas.toByteArray(); return bas.toByteArray();
} }
private byte[] dateToByte_ByteBuffer_Method(Date[] inputs) {
long[] ll = new long[inputs.length];
int i = 0;
for (Date date : inputs) {
ll[i] = date.getTime();
i++;
}
return longToByte_ByteBuffer_Method(ll);
}
private MqttWireMessage clientWithCorrectNodeAccessToken(MqttV5TestClient client) throws Exception { private MqttWireMessage clientWithCorrectNodeAccessToken(MqttV5TestClient client) throws Exception {
IMqttToken connectionResult = client.connectAndWait(gatewayAccessToken); IMqttToken connectionResult = client.connectAndWait(gatewayAccessToken);

View File

@ -15,13 +15,11 @@
*/ */
package org.thingsboard.server.transport.mqtt.sparkplug.timeseries; package org.thingsboard.server.transport.mqtt.sparkplug.timeseries;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.Assert; import org.junit.Assert;
import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry;
@ -37,20 +35,35 @@ import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.BooleanArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.DateTimeArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.DoubleArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.FloatArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int16Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int8; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int8;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int8Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.StringArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt16; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt16;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt16Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8Array;
/** /**
* Created by nickAS21 on 12.01.23 * Created by nickAS21 on 12.01.23
@ -134,7 +147,6 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception {
processClientWithCorrectNodeAccess(); processClientWithCorrectNodeAccess();
String deviceName = deviceId + "_" + 10;
String messageTypeName = SparkplugMessageType.NDATA.name(); String messageTypeName = SparkplugMessageType.NDATA.name();
List<String> listKeys = new ArrayList<>(); List<String> listKeys = new ArrayList<>();
List<TsKvEntry> listTsKvEntry = new ArrayList<>(); List<TsKvEntry> listTsKvEntry = new ArrayList<>();
@ -144,7 +156,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
.setSeq(getSeqNum()); .setSeq(getSeqNum());
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
createdAddMetricTsKv(listTsKvEntry, listKeys, ndataPayload, ts); createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) { if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
@ -152,7 +164,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
} }
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>(); AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NCMD.name()) await(alias + SparkplugMessageType.NDATA.name())
.atMost(40, TimeUnit.SECONDS) .atMost(40, TimeUnit.SECONDS)
.until(() -> { .until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
@ -162,8 +174,38 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry)); Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry));
} }
private void createdAddMetricTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys, protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysSimple() throws Exception {
SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException { processClientWithCorrectNodeAccess();
String messageTypeName = SparkplugMessageType.NDATA.name();
List<String> listKeys = new ArrayList<>();
List<TsKvEntry> listTsKvEntry = new ArrayList<>();
SparkplugBProto.Payload.Builder ndataPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis())
.setSeq(getSeqNum());
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
createdAddMetricValueArraysTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
ndataPayload.build().toByteArray(), 0, false);
}
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NDATA.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
return finalFuture.get().get().size() == listTsKvEntry.size();
});
Assert.assertTrue("Expected tsKvEntrys is not equal Actual tsKvEntrys", listTsKvEntry.containsAll(finalFuture.get().get()));
Assert.assertTrue("Actual tsKvEntrys is not equal Expected tsKvEntrys", finalFuture.get().get().containsAll(listTsKvEntry));
}
private void createdAddMetricValuePrimitiveTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys,
SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException {
String keys = "MyInt8"; String keys = "MyInt8";
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt8(), ts, Int8)); listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextInt8(), ts, Int8));
@ -206,7 +248,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
listKeys.add(keys); listKeys.add(keys);
keys = "MyDateTime"; keys = "MyDateTime";
listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, ts, ts, MetricDataType.DateTime)); listTsKvEntry.add(createdAddMetricTsKvLong(dataPayload, keys, nextDateTime(), ts, MetricDataType.DateTime));
listKeys.add(keys); listKeys.add(keys);
keys = "MyDouble"; keys = "MyDouble";
@ -231,6 +273,65 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
} }
private void createdAddMetricValueArraysTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys,
SparkplugBProto.Payload.Builder dataPayload, long ts) throws ThingsboardException {
String keys = "MyBytesArray";
byte[] bytes = {nextInt8(), nextInt8(), nextInt8()};
createdAddMetricTsKvJson(dataPayload, keys, bytes, ts, Bytes, listTsKvEntry, listKeys);
keys = "MyInt8Array";
byte[] int8s = {nextInt8(), nextInt8(), nextInt8()};
createdAddMetricTsKvJson(dataPayload, keys, int8s, ts, Int8Array, listTsKvEntry, listKeys);
keys = "MyInt16Array";
short[] int16s = {nextInt16(), nextInt16(), nextInt16()};
createdAddMetricTsKvJson(dataPayload, keys, int16s, ts, Int16Array, listTsKvEntry, listKeys);
keys = "MyInt32Array";
int[] int32s = {nextInt32(), nextInt32(), nextInt32()};
createdAddMetricTsKvJson(dataPayload, keys, int32s, ts, Int32Array, listTsKvEntry, listKeys);
keys = "MyInt64Array";
long[] int64s = {nextInt64(), nextInt64(), nextInt64()};
createdAddMetricTsKvJson(dataPayload, keys, int64s, ts, Int64Array, listTsKvEntry, listKeys);
keys = "MyUInt8Array";
short[] uInt8s = {nextUInt16(), nextUInt16(), nextUInt16()};
createdAddMetricTsKvJson(dataPayload, keys, uInt8s, ts, UInt8Array, listTsKvEntry, listKeys);
keys = "MyUInt16Array";
int[] uInt16s = {nextUInt16(), nextUInt16(), nextUInt16()};
createdAddMetricTsKvJson(dataPayload, keys, uInt16s, ts, UInt16Array, listTsKvEntry, listKeys);
keys = "MyUInt32LArray";
long[] uInt32Ls = {nextUInt32L(), nextUInt32L(), nextUInt32L()};
createdAddMetricTsKvJson(dataPayload, keys, uInt32Ls, ts, UInt32Array, listTsKvEntry, listKeys);
keys = "MyUInt64Array";
long[] uInt64s = {nextUInt64(), nextUInt64(), nextUInt64()};
createdAddMetricTsKvJson(dataPayload, keys, uInt64s, ts, UInt64Array, listTsKvEntry, listKeys);
keys = "MyFloatArray";
float[] floats = {nextFloat(0,300), nextFloat(0,4000), nextFloat(10,10000)};
createdAddMetricTsKvJson(dataPayload, keys, floats, ts, FloatArray, listTsKvEntry, listKeys);
keys = "MyDateTimeArray";
long[] dateTimes = {nextDateTime(), nextDateTime(), nextDateTime()};
createdAddMetricTsKvJson(dataPayload, keys, dateTimes, ts, DateTimeArray, listTsKvEntry, listKeys);
keys = "MyDoubleArray";
double [] doubles = {nextDouble(), nextDouble(), nextDouble()};
createdAddMetricTsKvJson(dataPayload, keys, doubles, ts, DoubleArray, listTsKvEntry, listKeys);
keys = "MyBooleanArray";
boolean [] booleans = {nextBoolean(), nextBoolean(), nextBoolean()};
createdAddMetricTsKvJson(dataPayload, keys, booleans, ts, BooleanArray, listTsKvEntry, listKeys);
keys = "MyStringArray";
String [] strings = {nexString(), nexString(), nexString()};
createdAddMetricTsKvJson(dataPayload, keys, strings, ts, StringArray, listTsKvEntry, listKeys);
}
private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value, private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value,
long ts, MetricDataType metricDataType) throws ThingsboardException { long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value)))); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value))));
@ -269,11 +370,71 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
return tsKvEntry; return tsKvEntry;
} }
private TsKvEntry createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys, String value, private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys,
long ts, MetricDataType metricDataType) throws ThingsboardException { Object values, long ts, MetricDataType metricDataType,
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new JsonDataEntry(keys, value)); List<TsKvEntry> listTsKvEntry,
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); List<String> listKeys) throws ThingsboardException {
return tsKvEntry; ArrayNode nodeArray = newArrayNode();
switch (metricDataType) {
case Bytes:
case Int8Array:
for (byte b : (byte[])values) {
nodeArray.add(b);
}
break;
case Int16Array:
case UInt8Array:
for (short b : (short[])values) {
nodeArray.add(b);
}
break;
case Int32Array:
case UInt16Array:
case Int64Array:
case UInt32Array:
case UInt64Array:
case DateTimeArray:
if (values instanceof int[]) {
for (int b : (int[])values) {
nodeArray.add(b);
}
} else {
for (long b : (long[])values) {
nodeArray.add(b);
}
}
break;
case DoubleArray:
for (double b : (double[])values) {
nodeArray.add(b);
}
break;
case FloatArray:
for (float b : (float[])values) {
nodeArray.add(b);
}
break;
case BooleanArray:
for (boolean b : (boolean[])values) {
nodeArray.add(b);
}
break;
case StringArray:
for (String b : (String[])values) {
nodeArray.add(b);
}
break;
default:
throw new IllegalStateException("Unexpected value: " + metricDataType);
}
if (nodeArray.size() > 0) {
Optional<TsKvEntry> tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(keys, nodeArray.toString())));
if (tsKvEntryOptional.isPresent()) {
dataPayload.addMetrics(createMetric(values, tsKvEntryOptional.get(), metricDataType));
listTsKvEntry.add(tsKvEntryOptional.get());
listKeys.add(keys);
}
}
} }
private byte nextInt8() { private byte nextInt8() {
@ -318,6 +479,12 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
return random.nextDouble(Long.MIN_VALUE, Long.MAX_VALUE); return random.nextDouble(Long.MIN_VALUE, Long.MAX_VALUE);
} }
private long nextDateTime() {
long min = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long max = calendar.getTimeInMillis();
return random.nextLong(min, max);
}
private float nextFloat(float min, float max) { private float nextFloat(float min, float max) {
if (min >= max) if (min >= max)
throw new IllegalArgumentException("max must be greater than min"); throw new IllegalArgumentException("max must be greater than min");
@ -327,7 +494,6 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
return result; return result;
} }
private boolean nextBoolean() { private boolean nextBoolean() {
return random.nextBoolean(); return random.nextBoolean();
} }
@ -336,25 +502,4 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
return java.util.UUID.randomUUID().toString(); return java.util.UUID.randomUUID().toString();
} }
private List<String> getActualKeysList(DeviceId deviceId, List<String> expectedKeys, long start) throws Exception {
// long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 3000;
List<String> actualKeys = null;
while (start <= end) {
actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", new TypeReference<>() {
});
Map<String, List<JsonNode>> timeseries = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" +
savedGateway.getId().getId() + "/values/timeseries?keys=" + expectedKeys.get(6), new TypeReference<>() {
});
if (actualKeys.size() == expectedKeys.size()) {
break;
}
Thread.sleep(300);
start += 100;
}
return actualKeys;
}
} }

View File

@ -53,4 +53,9 @@ public class MqttV5ClientSparkplugBTelemetryTest extends AbstractMqttV5ClientSpa
processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple(); processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple();
} }
@Test
public void testClientWithCorrectAccessTokenPushNodeMetricBuildPArraysSimple() throws Exception {
processClientWithCorrectAccessTokenPushNodeMetricBuildArraysSimple();
}
} }

View File

@ -25,15 +25,19 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.DoubleBuffer;
import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.ShortBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
import static org.thingsboard.common.util.JacksonUtil.newArrayNode; import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.BooleanArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes;
/** /**
* Provides utility methods for SparkplugB MQTT Payload Metric. * Provides utility methods for SparkplugB MQTT Payload Metric.
@ -91,42 +95,85 @@ public class SparkplugMetricUtil {
case UUID: case UUID:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoMetric.getStringValue()).build()); .setStringV(protoMetric.getStringValue()).build());
case Bytes: // byte[]
case BooleanArray: case BooleanArray:
ByteBuffer booleanByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
while (booleanByteBuffer.hasRemaining()){
nodeArray.add(booleanByteBuffer.get() == (byte) 0 ? false : true);
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
// byte[]
case Bytes:
case Int8Array: case Int8Array:
ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
while (byteBuffer.hasRemaining()){
nodeArray.add(byteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
// short[]
case Int16Array: case Int16Array:
case Int32Array:
case UInt8Array: case UInt8Array:
ShortBuffer shortByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asShortBuffer();
while (shortByteBuffer.hasRemaining()){
nodeArray.add(shortByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
// int[]
case Int32Array:
case UInt16Array: case UInt16Array:
IntBuffer intByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asIntBuffer();
while (intByteBuffer.hasRemaining()){
nodeArray.add(intByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
// float[]
case FloatArray: case FloatArray:
FloatBuffer floatByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asFloatBuffer();
while (floatByteBuffer.hasRemaining()){
nodeArray.add(floatByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
// double[]
case DoubleArray: case DoubleArray:
DoubleBuffer doubleByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asDoubleBuffer();
while (doubleByteBuffer.hasRemaining()){
nodeArray.add(doubleByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build());
// long[]
case DateTimeArray: case DateTimeArray:
case Int64Array: case Int64Array:
case UInt64Array: case UInt64Array:
case UInt32Array: case UInt32Array:
ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); LongBuffer longByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) { .asLongBuffer();
byteBuffer.order(ByteOrder.LITTLE_ENDIAN); while (longByteBuffer.hasRemaining()){
} nodeArray.add(longByteBuffer.get());
while (byteBuffer.hasRemaining()) {
setValueToNodeArray(nodeArray, byteBuffer, metricType);
} }
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build()); .setJsonV(nodeArray.toString()).build());
case StringArray: case StringArray:
ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN); final ByteArrayInputStream byteArrayInputStream =
StringBuilder sb = new StringBuilder(); new ByteArrayInputStream(stringByteBuffer.array());
while (stringByteBuffer.hasRemaining()) { final ObjectInputStream objectInputStream =
byte b = stringByteBuffer.get(); new ObjectInputStream(byteArrayInputStream);
if (b == (byte) 0) { final String[] stringArray = (String[]) objectInputStream.readObject();
nodeArray.add(sb.toString()); objectInputStream.close();
sb = new StringBuilder(); for (String s: stringArray) {
} else { nodeArray.add(s);
sb.append((char) b);
}
} }
return Optional.of(builderProto.setKey(key) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(nodeArray.toString()).build()); .setJsonV(nodeArray.toString()).build());
case DataSet: case DataSet:
case Template: case Template:
@ -167,35 +214,6 @@ public class SparkplugMetricUtil {
} }
} }
private static void setValueToNodeArray(ArrayNode nodeArray, ByteBuffer byteBuffer, int metricType) {
switch (MetricDataType.fromInteger(metricType)) {
case Bytes:
nodeArray.add(byteBuffer.get());
break;
case BooleanArray:
nodeArray.add(byteBuffer.get() == (byte) 0 ? "false" : "true");
break;
case Int8Array:
case Int16Array:
case Int32Array:
case UInt8Array:
case UInt16Array:
nodeArray.add(byteBuffer.getInt());
break;
case FloatArray:
nodeArray.add(byteBuffer.getFloat());
break;
case DoubleArray:
nodeArray.add(byteBuffer.getDouble());
break;
case DateTimeArray:
case Int64Array:
case UInt64Array:
case UInt32Array:
nodeArray.add(byteBuffer.getLong());
}
}
@JsonIgnoreProperties( @JsonIgnoreProperties(
value = {"fileName"}) value = {"fileName"})
@JsonSerialize( @JsonSerialize(