From d7d4d861b34c06ab0284d65f70692466e01f09b4 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Wed, 1 Feb 2023 18:37:12 +0200 Subject: [PATCH] sparkplug: add metricsBirth --- .../AbstractMqttV5ClientSparkplugTest.java | 204 ++----------- ...ctMqttV5ClientSparkplugConnectionTest.java | 75 ++--- .../MqttV5ClientSparkplugBConnectionTest.java | 21 +- ...actMqttV5ClientSparkplugTelemetryTest.java | 77 +++-- .../transport/mqtt/MqttTransportHandler.java | 137 +++++++-- .../AbstractGatewaySessionHandler.java | 13 +- .../MqttDeviceAwareSessionContext.java | 15 +- ...ava => SparkplugDeviceSessionContext.java} | 32 +- .../session/SparkplugNodeSessionHandler.java | 139 +++++---- .../util/sparkplug/SparkplugMetricUtil.java | 284 +++++++++++++++++- 10 files changed, 592 insertions(+), 405 deletions(-) rename common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/{SparkplugSessionCtx.java => SparkplugDeviceSessionContext.java} (50%) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index bef5bf4770..d57fb2e511 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -15,32 +15,23 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.mqttv5.client.IMqttToken; -import org.eclipse.paho.mqttv5.common.packet.MqttConnAck; -import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; -import org.junit.Assert; import org.thingsboard.server.common.data.TransportPayloadType; -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; -import org.thingsboard.server.common.data.exception.ThingsboardException; -import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; -import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.nio.ByteBuffer; import java.util.Calendar; -import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; /** * Created by nickAS21 on 12.01.23 @@ -71,171 +62,28 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte processBeforeTest(configProperties); } - public void processClientWithCorrectNodeAccess() throws Exception { + public MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH() throws Exception { + long ts = calendar.getTimeInMillis(); + long value = bdSeq = 0; + return clientWithCorrectNodeAccessTokenWithNDEATH(ts, value); + } + + public MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH(long ts, long value) throws Exception { + String key = keysBdSeq; + MetricDataType metricDataType = Int64; + SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(calendar.getTimeInMillis()); + deathPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + byte[] deathBytes = deathPayload.build().toByteArray(); this.client = new MqttV5TestClient(); - MqttWireMessage response = clientWithCorrectNodeAccessToken(client); - Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType()); - MqttConnAck connAckMsg = (MqttConnAck) response; - Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode()); - } - - protected SparkplugBProto.Payload.Metric createMetric(Object value, TsKvEntry tsKvEntry, MetricDataType metricDataType) throws ThingsboardException { - SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder() - .setTimestamp(tsKvEntry.getTs()) - .setName(tsKvEntry.getKey()) - .setDatatype(metricDataType.toIntValue()) - .build(); - switch (metricDataType) { - case Int8: - case Int16: - case UInt8: - case UInt16: - int valueMetric = Integer.valueOf(String.valueOf(value)); - return metric.toBuilder().setIntValue(valueMetric).build(); - case Int32: - case UInt32: - if (value instanceof Long) { - return metric.toBuilder().setLongValue((long) value).build(); - } else { - return metric.toBuilder().setIntValue((int)value).build(); - } - case Int64: - case UInt64: - case DateTime: - return metric.toBuilder().setLongValue((long) value).build(); - case Float: - return metric.toBuilder().setFloatValue((float) value).build(); - case Double: - return metric.toBuilder().setDoubleValue((double) value).build(); - case Boolean: - return metric.toBuilder().setBooleanValue((boolean) value).build(); - case String: - case Text: - case UUID: - return metric.toBuilder().setStringValue((String) value).build(); - case DataSet: - return metric.toBuilder().setDatasetValue((SparkplugBProto.Payload.DataSet) value).build(); - case Bytes: - case Int8Array: - ByteString byteString = ByteString.copyFrom((byte[]) value); - return metric.toBuilder().setBytesValue(byteString).build(); - case Int16Array: - case UInt8Array: - byte[] int16Array = shortArrayToByteArray((short[]) value); - ByteString byteInt16Array = ByteString.copyFrom((int16Array)); - return metric.toBuilder().setBytesValue(byteInt16Array).build(); - case Int32Array: - case UInt16Array: - case Int64Array: - case UInt32Array: - case UInt64Array: - case DateTimeArray: - if (value instanceof int[]) { - byte[] int32Array = integerArrayToByteArray((int[]) value); - ByteString byteInt32Array = ByteString.copyFrom((int32Array)); - return metric.toBuilder().setBytesValue(byteInt32Array).build(); - } else { - byte[] int64Array = longArrayToByteArray((long[]) value); - ByteString byteInt64Array = ByteString.copyFrom((int64Array)); - return metric.toBuilder().setBytesValue(byteInt64Array).build(); - } - case DoubleArray: - byte[] doubleArray = doublArrayToByteArray((double[]) value); - ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); - return metric.toBuilder().setBytesValue(byteDoubleArray).build(); - case FloatArray: - byte[] floatArray = floatArrayToByteArray((float[]) value); - ByteString byteFloatArray = ByteString.copyFrom(floatArray); - return metric.toBuilder().setBytesValue(byteFloatArray).build(); - case BooleanArray: - byte[] booleanArray = booleanArrayToByteArray((boolean[]) value); - ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); - return metric.toBuilder().setBytesValue(byteBooleanArray).build(); - case StringArray: - byte[] stringArray = stringArrayToByteArray((String[]) value); - ByteString byteStringArray = ByteString.copyFrom(stringArray); - return metric.toBuilder().setBytesValue(byteStringArray).build(); - case File: - SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value; - ByteString byteFileString = ByteString.copyFrom(file.getBytes()); - return metric.toBuilder().setBytesValue(byteFileString).build(); - case Template: - return metric.toBuilder().setTemplateValue((SparkplugBProto.Payload.Template) value).build(); - case Unknown: - throw new ThingsboardException("Invalid value for MetricDataType " + metricDataType.name(), ThingsboardErrorCode.INVALID_ARGUMENTS); - } - return metric; - } - - private byte[] shortArrayToByteArray(short[] inputs) { - ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2); - for (short d : inputs) { - bb.putShort(d); - } - return bb.array(); - } - - private byte[] integerArrayToByteArray(int[] inputs) { - ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4); - for (int d : inputs) { - bb.putInt(d); - } - return bb.array(); - } - - private byte[] longArrayToByteArray(long[] inputs) { - ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); - for (long d : inputs) { - bb.putLong(d); - } - return bb.array(); - } - - private byte[] doublArrayToByteArray(double[] inputs) { - ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); - for (double d : inputs) { - bb.putDouble(d); - } - return bb.array(); - } - - private byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException { - ByteArrayOutputStream bas = new ByteArrayOutputStream(); - DataOutputStream ds = new DataOutputStream(bas); - for (float f : inputs) { - try { - ds.writeFloat(f); - } catch (IOException e) { - throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS); - } - } - return bas.toByteArray(); - } - - private byte[] booleanArrayToByteArray(boolean[] inputs) { - byte[] toReturn = new byte[inputs.length]; - for (int entry = 0; entry < toReturn.length; entry++) { - toReturn[entry] = (byte) (inputs[entry]?1:0); - } - return toReturn; - } - - private byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException { - final ByteArrayOutputStream bas = new ByteArrayOutputStream(); - try { - final ObjectOutputStream os = new ObjectOutputStream(bas); - os.writeObject(inputs); - os.flush(); - os.close(); - } catch (Exception e) { - throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS); - } - return bas.toByteArray(); - } - - - private MqttWireMessage clientWithCorrectNodeAccessToken(MqttV5TestClient client) throws Exception { - IMqttToken connectionResult = client.connectAndWait(gatewayAccessToken); + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setUserName(gatewayAccessToken); + String topic = NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode; + MqttMessage msg = new MqttMessage(); + msg.setId(0); + msg.setPayload(deathBytes); + options.setWill(topic, msg); + IMqttToken connectionResult = client.connect(options); return connectionResult.getResponse(); } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index c0f9bf6797..c02c23e213 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -17,9 +17,7 @@ package org.thingsboard.server.transport.mqtt.sparkplug.connection; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.mqttv5.client.IMqttToken; -import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; -import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.packet.MqttConnAck; import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; @@ -34,16 +32,16 @@ import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSpark import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; +import java.util.HashSet; import java.util.Optional; import java.util.Set; -import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; /** * Created by nickAS21 on 12.01.23 @@ -51,21 +49,10 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp @Slf4j public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest { - protected void processClientWithCorrectNodeAccessTokenTest() throws Exception { - processClientWithCorrectNodeAccess(); - } - - protected void processClientWithCorrectNodeAccessTokenWithNdeathTest() throws Exception { + protected void processClientWithCorrectNodeAccessTokenWithNDEATH_Test() throws Exception { long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS; long value = bdSeq = 0; - MetricDataType metricDataType = Int64; - TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, value)); - - SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder() - .setTimestamp(calendar.getTimeInMillis()); - deathPayload.addMetrics(createMetric(value, tsKvEntryBdSecOriginal, metricDataType)); - - MqttWireMessage response = clientWithCorrectNodeAccessTokenWithNDEATH(deathPayload.build().toByteArray()); + MqttWireMessage response = clientWithCorrectNodeAccessTokenWithNDEATH(ts, value); Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType()); @@ -86,15 +73,23 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr Assert.assertEquals(expectedTsKvEntry, actualTsKvEntry); } - protected void processClientWithCorrectAccessTokenCreatedDevices(int cntDevices) throws Exception { - processClientWithCorrectNodeAccess(); + protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception { + this.client = new MqttV5TestClient(); + MqttException actualException = Assert.assertThrows(MqttException.class, () -> client.connectAndWait(gatewayAccessToken)); + String expectedMessage = "Server unavailable."; + int expectedReasonCode = 136; + Assert.assertEquals(expectedMessage, actualException.getMessage()); + Assert.assertEquals( expectedReasonCode, actualException.getReasonCode()); + } + + protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); long ts = calendar.getTimeInMillis(); MetricDataType metricDataType = Int32; Set deviceIds = new HashSet<>(); - String keys = "Device Metric int32"; + String key = "Device Metric int32"; int valueDeviceInt32 = 1024; - TsKvEntry expectedTsKvEntryDeviceInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueDeviceInt32))); - SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, expectedTsKvEntryDeviceInt32, metricDataType); + SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); for (int i=0; i < cntDevices; i++ ) { SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()) @@ -105,36 +100,18 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr if (client.isConnected()) { client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, payloadBirthDevice.build().toByteArray(), 0, false); - deviceIds.add(deviceName); + AtomicReference device = new AtomicReference<>(); + await(alias + "find device [" + deviceName + "] after created") + .atMost(200, TimeUnit.SECONDS) + .until(() -> { + device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); + return device.get() != null; + }); } + deviceIds.add(deviceName); } - Assert.assertEquals(cntDevices, deviceIds.size()); - - for (String deviceName: deviceIds) { - AtomicReference device = new AtomicReference<>(); - await(alias + "find device [" + deviceName + "] after crete") - .atMost(40, TimeUnit.SECONDS) - .until(() -> { - device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); - return device.get() != null; - }); - } } - private MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH(byte[] deathBytes) throws Exception { - this.client = new MqttV5TestClient(); - MqttConnectionOptions options = new MqttConnectionOptions(); - options.setUserName(gatewayAccessToken); - if (deathBytes != null) { - String topic = NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode; - MqttMessage msg = new MqttMessage(); - msg.setId(0); - msg.setPayload(deathBytes); - options.setWill(topic, msg); - } - IMqttToken connectionResult = client.connect(options); - return connectionResult.getResponse(); - } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java index 85f708d0f8..6c928358ef 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java @@ -39,24 +39,25 @@ public class MqttV5ClientSparkplugBConnectionTest extends AbstractMqttV5ClientSp } } - @Test - public void testClientWithCorrectAccessToken() throws Exception { - processClientWithCorrectNodeAccessTokenTest(); - } - @Test public void testClientWithCorrectAccessTokenWithNDEATH() throws Exception { - processClientWithCorrectNodeAccessTokenWithNdeathTest(); + processClientWithCorrectNodeAccessTokenWithNDEATH_Test(); } @Test - public void testClientWithCorrectAccessTokenCreatedOneDevice() throws Exception { - processClientWithCorrectAccessTokenCreatedDevices(1); + public void testClientWithCorrectNodeAccessTokenWithoutNDEATH() throws Exception { + processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test(); + } + + + @Test + public void testClientWithCorrectAccessTokenWithNDEATHCreatedOneDevice() throws Exception { + processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1); } @Test - public void testClientWithCorrectAccessTokenCreatedTwoDevice() throws Exception { - processClientWithCorrectAccessTokenCreatedDevices(2); + public void testClientWithCorrectAccessTokenWithNDEATHCreatedTwoDevice() throws Exception { + processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(2); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index e2892aaa3a..af283c3603 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -64,6 +64,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64Array; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8Array; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; /** * Created by nickAS21 on 12.01.23 @@ -74,30 +75,28 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac protected ThreadLocalRandom random = ThreadLocalRandom.current(); protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { - processClientWithCorrectNodeAccess(); + clientWithCorrectNodeAccessTokenWithNDEATH(); List listKeys = new ArrayList<>(); SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()); long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long valueBdSec = getBdSeqNum(); MetricDataType metricDataType = Int64; - TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec)); - payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType)); + String key = keysBdSeq; + payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, key, metricDataType)); listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); - String keys = "Node Control/Rebirth"; + key = "Node Control/Rebirth"; boolean valueRebirth = false; metricDataType = MetricDataType.Boolean; - TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth)); - payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType)); - listKeys.add(keys); + payloadBirthNode.addMetrics(createMetric(valueRebirth, ts, key, metricDataType)); + listKeys.add(key); - keys = "Node Metric int32"; + key = "Node Metric int32"; int valueNodeInt32 = 1024; metricDataType = Int32; - TsKvEntry expectedSsKvEntryNodeInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueNodeInt32))); - payloadBirthNode.addMetrics(createMetric(valueNodeInt32, expectedSsKvEntryNodeInt32, metricDataType)); - listKeys.add(keys); + payloadBirthNode.addMetrics(createMetric(valueNodeInt32, ts, key, metricDataType)); + listKeys.add(key); client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); @@ -113,23 +112,22 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac } protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { - processClientWithCorrectNodeAccess(); + clientWithCorrectNodeAccessTokenWithNDEATH(); SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()); List listKeys = new ArrayList<>(); long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long valueBdSec = getBdSeqNum(); MetricDataType metricDataType = Int64; - TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec)); - payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType)); + String key = keysBdSeq; + payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, key, metricDataType)); listKeys.add(SparkplugMessageType.NCMD.name() + " " + keysBdSeq); - String keys = "Node Control/Rebirth"; + key = "Node Control/Rebirth"; boolean valueRebirth = true; metricDataType = MetricDataType.Boolean; - TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth)); - payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType)); - listKeys.add(keys); + payloadBirthNode.addMetrics(createMetric(valueRebirth, ts, key, metricDataType)); + listKeys.add(key); client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NCMD.name() + "/" + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); @@ -145,7 +143,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac } protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception { - processClientWithCorrectNodeAccess(); + clientWithCorrectNodeAccessTokenWithNDEATH();; String messageTypeName = SparkplugMessageType.NDATA.name(); List listKeys = new ArrayList<>(); @@ -175,7 +173,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac } protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysSimple() throws Exception { - processClientWithCorrectNodeAccess(); + clientWithCorrectNodeAccessTokenWithNDEATH();; String messageTypeName = SparkplugMessageType.NDATA.name(); List listKeys = new ArrayList<>(); @@ -332,45 +330,44 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac createdAddMetricTsKvJson(dataPayload, keys, strings, ts, StringArray, listTsKvEntry, listKeys); } - private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value, + private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String key, Object value, long ts, MetricDataType metricDataType) throws ThingsboardException { - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value)))); - - dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, Long.valueOf(String.valueOf(value)))); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); return tsKvEntry; } - private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value, + private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String key, Object value, long ts, MetricDataType metricDataType) throws ThingsboardException { var f = new BigDecimal(String.valueOf(value)); - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(keys, f.doubleValue())); - dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(key, f.doubleValue())); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); return tsKvEntry; } - private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String keys, double value, + private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String key, double value, long ts, MetricDataType metricDataType) throws ThingsboardException { var d = new BigDecimal(String.valueOf(value)); - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, d.longValueExact())); - dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, d.longValueExact())); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); return tsKvEntry; } - private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String keys, boolean value, + private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String key, boolean value, long ts, MetricDataType metricDataType) throws ThingsboardException { - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, value)); - dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(key, value)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); return tsKvEntry; } - private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String keys, String value, + private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String key, String value, long ts, MetricDataType metricDataType) throws ThingsboardException { - TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, value)); - dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType)); + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(key, value)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); return tsKvEntry; } - private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys, + private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String key, Object values, long ts, MetricDataType metricDataType, List listTsKvEntry, List listKeys) throws ThingsboardException { @@ -428,11 +425,11 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac throw new IllegalStateException("Unexpected value: " + metricDataType); } if (nodeArray.size() > 0) { - Optional tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(keys, nodeArray.toString()))); + Optional tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(key, nodeArray.toString()))); if (tsKvEntryOptional.isPresent()) { - dataPayload.addMetrics(createMetric(values, tsKvEntryOptional.get(), metricDataType)); + dataPayload.addMetrics(createMetric(values, ts, key, metricDataType)); listTsKvEntry.add(tsKvEntryOptional.get()); - listKeys.add(keys); + listKeys.add(key); } } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 747c26e6fd..e53dcd520b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.device.profile.MqttTopics; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; @@ -107,6 +108,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe; @@ -390,13 +392,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement // TODO break; case NBIRTH: + sparkplugSessionHandler.setMetricsBirthNode (sparkplugBProtoNode.getMetricsList()); + sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); + break; case NCMD: case NDATA: sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); break; - case NDEATH: - sparkplugSessionHandler.onDeviceDisconnect(mqttMsg); - break; case NRECORD: // TODO break; @@ -410,16 +412,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement // TODO break; case DBIRTH: - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); + System.out.println(); break; case DCMD: case DDATA: sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); break; + /** + * TODO + * 7.3.2. Device Death Certificate (DDEATH) + * The Sparkplug™ Topic Namespace for a device Death Certificate is: + * namespace/group_id/DDEATH/edge_node_id/device_id + * It is the responsibility of the MQTT EoN node to indicate the real-time state of either physical legacy device using + * poll/response protocols and/or local logical devices. If the device becomes unavailable for any reason (no + * response, CRC error, etc.) it is the responsibility of the EoN node to publish a DDEATH on behalf of the end device. + * Immediately upon reception of a DDEATH, any MQTT client subscribed to this device should set the data quality of + * all metrics to “STALE” and should note the time stamp when the DDEATH message was received. + */ case DDEATH: - sparkplugSessionHandler.onDeviceDisconnect(mqttMsg); + sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, deviceName); break; case DRECORD: // TODO @@ -437,6 +450,62 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void handleSparkplugSubscribeMsg(List grantedQoSList, MqttTopicSubscription subscription, MqttQoS reqQoS) throws ThingsboardException { + SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicName()); + if (sparkplugTopic.getGroupId() == null) { + // TODO SUBSCRIBE NameSpace + } else if (sparkplugTopic.getType() == null) { + // TODO SUBSCRIBE GroupId + } else if (sparkplugTopic.isNode()) { + // A node topic + processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1); + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case NBIRTH: + // TODO + break; + case NCMD: + // TODO + break; + case NDATA: + // TODO + break; + case NDEATH: + // TODO + break; + case NRECORD: + // TODO + break; + default: + } + } else { + // A device topic + switch (sparkplugTopic.getType()) { + case STATE: + // TODO + break; + case DBIRTH: + // TODO + break; + case DCMD: + // TODO + break; + case DDATA: + // TODO + break; + case DDEATH: + // TODO + break; + case DRECORD: + // TODO + break; + default: + } + } + } + private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { Matcher fwMatcher; @@ -698,8 +767,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement MqttQoS reqQoS = subscription.qualityOfService(); try { if (sparkplugSessionHandler != null) { - SparkplugTopic sparkplugTopic = parseTopicSubscribe(mqttMsg.payload().topicSubscriptions().get(0).topicName()); - sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS); + handleSparkplugSubscribeMsg(grantedQoSList, subscription, reqQoS); + activityReported = true; } else { switch (topic) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { @@ -1060,24 +1129,47 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void checkSparkplugSession(MqttConnectMessage connectMessage) { + /** + * Sparkplug™ Specification Version 2.2 + * 7.1.1. EoN Node Death Certificate (NDEATH) + * The Death Certificate topic for an MQTT EoN node is: + * namespace/group_id/NDEATH/edge_node_id + * The Death Certificate topic and payload described here are not “published” as an MQTT message by a client, but + * provided as parameters within the MQTT CONNECT control packet when this MQTT EoN node first establishes the + * MQTT Client session. + */ + + private void checkSparkplugNodeSession(MqttConnectMessage connectMessage, ChannelHandlerContext ctx) { try { - if (sparkplugSessionHandler == null && validatedSparkplugTopic(connectMessage)) { - SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); - SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic()); - sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopicNode); - sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, - deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode); + if (sparkplugSessionHandler == null) { + SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage); + if (sparkplugTopicNode != null) { + SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); + sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopicNode); + sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, + deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode); + } else { + log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName()); + throw new ThingsboardException("Invalid request body", ThingsboardErrorCode.BAD_REQUEST_PARAMS); + } } } catch (Exception e) { - log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); + log.trace("[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); + ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); + ctx.close(); } } - private boolean validatedSparkplugTopic (MqttConnectMessage connectMessage) { - return StringUtils.isNotBlank(connectMessage.payload().willTopic()) + private SparkplugTopic validatedSparkplugTopicConnectedNode (MqttConnectMessage connectMessage) throws ThingsboardException { + if(StringUtils.isNotBlank(connectMessage.payload().willTopic()) && connectMessage.payload().willMessageInBytes() != null - && connectMessage.payload().willMessageInBytes().length > 0; + && connectMessage.payload().willMessageInBytes().length > 0) { + SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic()); + if(NDEATH.equals(sparkplugTopicNode.getType())){ + return sparkplugTopicNode; + } + } + return null; } @Override @@ -1127,7 +1219,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onSuccess(Void msg) { SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); if (deviceSessionCtx.isSparkplug()) { - checkSparkplugSession(connectMessage); + checkSparkplugNodeSession(connectMessage, ctx); } else { checkGatewaySession(sessionMetaData); } @@ -1169,7 +1261,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement String topic = attrSubTopicType.getAttributesSubTopic(); MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType); try { - adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + if (sparkplugSessionHandler != null) { + log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); + sparkplugSessionHandler.createMqttPublishMsg(deviceSessionCtx, notification).ifPresent(sparkplugSessionHandler::writeAndFlush); + } else { + adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } } catch (Exception e) { log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index bf0d9230db..a6361978b3 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -186,7 +186,7 @@ public abstract class AbstractGatewaySessionHandler { } } - ChannelFuture writeAndFlush(MqttMessage mqttMessage) { + public ChannelFuture writeAndFlush(MqttMessage mqttMessage) { return channel.writeAndFlush(mqttMessage); } @@ -215,7 +215,7 @@ public abstract class AbstractGatewaySessionHandler { }, context.getExecutor()); } - private ListenableFuture onDeviceConnect(String deviceName, String deviceType) { + ListenableFuture onDeviceConnect(String deviceName, String deviceType) { MqttDeviceAwareSessionContext result = devices.get(deviceName); if (result == null) { Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock()); @@ -252,7 +252,7 @@ public abstract class AbstractGatewaySessionHandler { new TransportServiceCallback<>() { @Override public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ; + GatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ; if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); @@ -282,8 +282,8 @@ public abstract class AbstractGatewaySessionHandler { } } - private AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { - return this.deviceSessionCtx.isSparkplug() ? new SparkplugSessionCtx(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) : + private GatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) { + return this.deviceSessionCtx.isSparkplug() ? new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) : new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); } @@ -324,7 +324,7 @@ public abstract class AbstractGatewaySessionHandler { } } - private void processOnDisconnect(MqttPublishMessage msg, String deviceName) { + void processOnDisconnect(MqttPublishMessage msg, String deviceName) { deregisterSession(deviceName); ack(msg, ReturnCode.SUCCESS); } @@ -716,6 +716,7 @@ public abstract class AbstractGatewaySessionHandler { private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) { transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); + System.out.println("Removed device " + deviceName + " from the gateway session"); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java index ded99daf60..5b4a9a4c95 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -19,11 +19,10 @@ import io.netty.handler.codec.mqtt.MqttQoS; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -33,23 +32,25 @@ import java.util.stream.Collectors; public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { private final ConcurrentMap mqttQoSMap; - private final Set metricBirth = new HashSet<>(); + private final Map metricsBirthDevice; public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap mqttQoSMap) { super(sessionId); this.mqttQoSMap = mqttQoSMap; + this.metricsBirthDevice = new ConcurrentHashMap<>(); } public ConcurrentMap getMqttQoSMap() { return mqttQoSMap; } - public Set getMetricBirth() { - return metricBirth; + public Map getMetricsBirthDevice() { + return metricsBirthDevice; } - public void setMetricBirth(java.util.List metrics) { - this.metricBirth.addAll(metrics); + public void setMetricsBirthDevice(java.util.List metrics) { + this.metricsBirthDevice.putAll(metrics.stream() + .collect(Collectors.toMap(metric -> metric.getName(), metric -> metric))); } public MqttQoS getQoSForTopic(String topic) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java similarity index 50% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index 9a87dbfe84..a6f0d0d247 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -15,47 +15,31 @@ */ package org.thingsboard.server.transport.mqtt.session; -import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -/** - * Created by nickAS21 on 08.12.22 - */ @Slf4j -public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext { +public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{ - public SparkplugSessionCtx(AbstractGatewaySessionHandler parent, - TransportDeviceInfo deviceInfo, - DeviceProfile deviceProfile, - ConcurrentMap mqttQoSMap, - TransportService transportService) { + public SparkplugDeviceSessionContext(AbstractGatewaySessionHandler parent, + TransportDeviceInfo deviceInfo, + DeviceProfile deviceProfile, + ConcurrentMap mqttQoSMap, + TransportService transportService) { super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); } @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); - createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush); - } - - private Optional createMqttPublishMsg (MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notification) { - try { - // TODO metrics from notification & MetricsDBIRTH - byte[] payloadInBytes = new byte[3]; - String topic = ((SparkplugNodeSessionHandler) parent).sparkplugTopicNode.toString() + "/" + deviceInfo.getDeviceName(); - return Optional.of(parent.getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes)); - } catch (Exception e) { - log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e); - return Optional.empty(); - } + ((SparkplugNodeSessionHandler)parent).createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index d36d6ba2f4..d72a69f111 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -22,9 +22,7 @@ import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.protobuf.Descriptors; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.transport.adaptor.AdaptorException; @@ -32,19 +30,24 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; +import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getFromSparkplugBMetricToKeyValueProto; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric; /** * Created by nickAS21 on 12.12.22 @@ -52,13 +55,24 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopi @Slf4j public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { - final SparkplugTopic sparkplugTopicNode; + private final SparkplugTopic sparkplugTopicNode; + private final Map metricsBirthNode; - public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, SparkplugTopic sparkplugTopicNode) { + public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, + SparkplugTopic sparkplugTopicNode) { super(deviceSessionCtx, sessionId); this.sparkplugTopicNode = sparkplugTopicNode; + this.metricsBirthNode = new ConcurrentHashMap<>(); } + public void setMetricsBirthNode(java.util.List metrics) { + this.metricsBirthNode.putAll(metrics.stream() + .collect(Collectors.toMap(metric -> metric.getName(), metric -> metric))); + } + + public Map getMetricsBirthNode() { + return this.metricsBirthNode; + } public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; @@ -72,14 +86,14 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException { + public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { checkDeviceName(deviceName); ListenableFuture contextListenableFuture = topic.isNode() ? - Futures.immediateFuture(this.deviceSessionCtx) : checkDeviceConnected(deviceName); + Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName); List msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { try { - contextListenableFuture.get().setMetricBirth(sparkplugBProto.getMetricsList()); + contextListenableFuture.get().setMetricsBirthDevice(sparkplugBProto.getMetricsList()); } catch (InterruptedException | ExecutionException e) { log.error("Failed add Metrics. MessageType *BIRTH.", e); } @@ -115,57 +129,21 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void handleSparkplugSubscribeMsg(List grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) { - if (sparkplugTopic.getGroupId() == null) { - // TODO SUBSCRIBE NameSpace - } else if (sparkplugTopic.getType() == null) { - // TODO SUBSCRIBE GroupId - } else if (sparkplugTopic.isNode()) { - // A node topic - switch (sparkplugTopic.getType()) { - case STATE: - // TODO - break; - case NBIRTH: - // TODO - break; - case NCMD: - // TODO - break; - case NDATA: - // TODO - break; - case NDEATH: - // TODO - break; - case NRECORD: - // TODO - break; - default: - } - } else { - // A device topic - switch (sparkplugTopic.getType()) { - case STATE: - // TODO - break; - case DBIRTH: - // TODO - break; - case DCMD: - // TODO - break; - case DDATA: - // TODO - break; - case DDEATH: - // TODO - break; - case DRECORD: - // TODO - break; - default: - } + public void onDeviceDisconnect(MqttPublishMessage mqttMsg, String deviceName) throws AdaptorException { + try { + processOnDisconnect(mqttMsg, deviceName); + } catch (RuntimeException e) { + throw new AdaptorException(e); + } + } + + private ListenableFuture onDeviceConnectProto(String deviceName) throws ThingsboardException { + try { + String deviceType = this.gateway.getDeviceType() + "-node"; + return onDeviceConnect(deviceName, deviceType); + } catch (RuntimeException e) { + log.error("Failed Sparkplug Device connect proto!", e); + throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS); } } @@ -176,7 +154,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { long ts = protoMetric.getTimestamp(); String keys = "bdSeq".equals(protoMetric.getName()) ? topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); - Optional keyValueProtoOpt = getFromSparkplugBMetricToKeyValueProto(keys, protoMetric); + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(keys, protoMetric); if (keyValueProtoOpt.isPresent()) { List result = new ArrayList<>(); result.add(keyValueProtoOpt.get()); @@ -209,17 +187,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void onDeviceConnectProto(MqttPublishMessage mqttPublishMessage, String nodeDeviceType) throws ThingsboardException { + public SparkplugTopic getSparkplugTopicNode() { + return this.sparkplugTopicNode; + } + + + public Optional createMqttPublishMsg (MqttDeviceAwareSessionContext ctx, + TransportProtos.AttributeUpdateNotificationMsg notification, + String... deviceName) { try { - String topic = mqttPublishMessage.variableHeader().topicName(); - SparkplugTopic sparkplugTopic = parseTopicSubscribe(topic); - String deviceName = checkDeviceName(sparkplugTopic.getDeviceId()); - String deviceType = StringUtils.isEmpty(nodeDeviceType) ? DEFAULT_DEVICE_TYPE : nodeDeviceType; - processOnConnect(mqttPublishMessage, deviceName, deviceType); - } catch (RuntimeException | ThingsboardException e) { - log.error("Failed Sparkplug Device connect proto!", e); - throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS); + long ts = notification.getSharedUpdated(0).getTs(); + String key = notification.getSharedUpdated(0).getKv().getKey(); + if (metricsBirthNode.containsKey(key)) { + SparkplugBProto.Payload.Metric metricBirth = metricsBirthNode.get(key); + MetricDataType metricDataType = MetricDataType.fromInteger(metricBirth.getDatatype()); + Optional value = validatedValueByTypeMetric(notification.getSharedUpdated(0).getKv(), metricDataType); + if (value.isPresent()) { + SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts); + cmdPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + byte[] payloadInBytes = cmdPayload.build().toByteArray(); + String topic = deviceName == null ? sparkplugTopicNode.toString() : sparkplugTopicNode.toString() + + "/" + deviceName; + return Optional.of(getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes)); + } + } else { + return Optional.empty(); + } + } catch (Exception e) { + log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e); + return Optional.empty(); } + return Optional.empty(); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index c24d8a5d36..c89c21b01a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.ser.std.FileSerializer; +import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; @@ -26,7 +27,11 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.DoubleBuffer; @@ -45,7 +50,7 @@ import static org.thingsboard.common.util.JacksonUtil.newArrayNode; @Slf4j public class SparkplugMetricUtil { - public static Optional getFromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { + public static Optional fromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { // Check if the null flag has been set indicating that the value is null if (protoMetric.getIsNull()) { return Optional.empty(); @@ -214,6 +219,283 @@ public class SparkplugMetricUtil { } } + public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException { + SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder() + .setTimestamp(ts) + .setName(key) + .setDatatype(metricDataType.toIntValue()) + .build(); + switch (metricDataType) { + case Int8: + case Int16: + case UInt8: + case UInt16: + int valueMetric = Integer.valueOf(String.valueOf(value)); + return metric.toBuilder().setIntValue(valueMetric).build(); + case Int32: + case UInt32: + if (value instanceof Long) { + return metric.toBuilder().setLongValue((long) value).build(); + } else { + return metric.toBuilder().setIntValue((int)value).build(); + } + case Int64: + case UInt64: + case DateTime: + return metric.toBuilder().setLongValue((long) value).build(); + case Float: + return metric.toBuilder().setFloatValue((float) value).build(); + case Double: + return metric.toBuilder().setDoubleValue((double) value).build(); + case Boolean: + return metric.toBuilder().setBooleanValue((boolean) value).build(); + case String: + case Text: + case UUID: + return metric.toBuilder().setStringValue((String) value).build(); + case Bytes: + case Int8Array: + ByteString byteString = ByteString.copyFrom((byte[]) value); + return metric.toBuilder().setBytesValue(byteString).build(); + case Int16Array: + case UInt8Array: + byte[] int16Array = shortArrayToByteArray((short[]) value); + ByteString byteInt16Array = ByteString.copyFrom((int16Array)); + return metric.toBuilder().setBytesValue(byteInt16Array).build(); + case Int32Array: + case UInt16Array: + case Int64Array: + case UInt32Array: + case UInt64Array: + case DateTimeArray: + if (value instanceof int[]) { + byte[] int32Array = integerArrayToByteArray((int[]) value); + ByteString byteInt32Array = ByteString.copyFrom((int32Array)); + return metric.toBuilder().setBytesValue(byteInt32Array).build(); + } else { + byte[] int64Array = longArrayToByteArray((long[]) value); + ByteString byteInt64Array = ByteString.copyFrom((int64Array)); + return metric.toBuilder().setBytesValue(byteInt64Array).build(); + } + case DoubleArray: + byte[] doubleArray = doublArrayToByteArray((double[]) value); + ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); + return metric.toBuilder().setBytesValue(byteDoubleArray).build(); + case FloatArray: + byte[] floatArray = floatArrayToByteArray((float[]) value); + ByteString byteFloatArray = ByteString.copyFrom(floatArray); + return metric.toBuilder().setBytesValue(byteFloatArray).build(); + case BooleanArray: + byte[] booleanArray = booleanArrayToByteArray((boolean[]) value); + ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); + return metric.toBuilder().setBytesValue(byteBooleanArray).build(); + case StringArray: + byte[] stringArray = stringArrayToByteArray((String[]) value); + ByteString byteStringArray = ByteString.copyFrom(stringArray); + return metric.toBuilder().setBytesValue(byteStringArray).build(); + case DataSet: + return metric.toBuilder().setDatasetValue((SparkplugBProto.Payload.DataSet) value).build(); + case File: + SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value; + ByteString byteFileString = ByteString.copyFrom(file.getBytes()); + return metric.toBuilder().setBytesValue(byteFileString).build(); + case Template: + return metric.toBuilder().setTemplateValue((SparkplugBProto.Payload.Template) value).build(); + case Unknown: + throw new ThingsboardException("Invalid value for MetricDataType " + metricDataType.name(), ThingsboardErrorCode.INVALID_ARGUMENTS); + } + return metric; + } + + public static Optional validatedValueByTypeMetric (TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { + try { + switch (metricDataType) { + case Int8: + case Int16: + case UInt8: + case UInt16: + case Int32: + case UInt32: + case Int64: + case UInt64: + case DateTime: + if (kv.getTypeValue()==1) { + return Optional.of(Integer.valueOf(String.valueOf(kv.getLongV()))); + } + break; + case Float: + if (kv.getTypeValue()==2) { + var f = new BigDecimal(String.valueOf(kv.getDoubleV())); + return Optional.of(f.floatValue()); + } + break; + case Double: + if (kv.getTypeValue()==2) { + return Optional.of(kv.getLongV()); + } + break; + case Boolean: + if (kv.getTypeValue()==3) { + return Optional.of(kv.getBoolV()); + } + break; + case String: + case Text: + case UUID: + if (kv.getTypeValue()==4) { + return Optional.of(kv.getStringV()); + } + break; + case Bytes: + case Int8Array: + if (kv.getTypeValue()==5) { +// ByteString byteString = ByteString.copyFrom((byte[]) value); +// return metric.toBuilder().setBytesValue(byteString).build(); + return Optional.of(kv.getJsonV()); + } + break; + case Int16Array: + case UInt8Array: + if (kv.getTypeValue()==5) { +// byte[] int16Array = shortArrayToByteArray((short[]) value); +// ByteString byteInt16Array = ByteString.copyFrom((int16Array)); + return Optional.of(kv.getJsonV()); + } + break; + case Int32Array: + case UInt16Array: + case Int64Array: + case UInt32Array: + case UInt64Array: + case DateTimeArray: + if (kv.getTypeValue()==5) { +// if (value instanceof int[]) { +// byte[] int32Array = integerArrayToByteArray((int[]) value); +// ByteString byteInt32Array = ByteString.copyFrom((int32Array)); +// return metric.toBuilder().setBytesValue(byteInt32Array).build(); +// } else { +// byte[] int64Array = longArrayToByteArray((long[]) value); +// ByteString byteInt64Array = ByteString.copyFrom((int64Array)); +// return metric.toBuilder().setBytesValue(byteInt64Array).build(); +// } + return Optional.of(kv.getJsonV()); + } + break; + case DoubleArray: + if (kv.getTypeValue()==5) { +// byte[] doubleArray = doublArrayToByteArray((double[]) value); +// ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); +// return metric.toBuilder().setBytesValue(byteDoubleArray).build(); + return Optional.of(kv.getJsonV()); + } + break; + case FloatArray: + if (kv.getTypeValue()==5) { +// byte[] floatArray = floatArrayToByteArray((float[]) value); +// ByteString byteFloatArray = ByteString.copyFrom(floatArray); +// return metric.toBuilder().setBytesValue(byteFloatArray).build(); + return Optional.of(kv.getJsonV()); + } + break; + case BooleanArray: + if (kv.getTypeValue()==5) { +// byte[] booleanArray = booleanArrayToByteArray((boolean[]) value); +// ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); +// return metric.toBuilder().setBytesValue(byteBooleanArray).build(); + return Optional.of(kv.getJsonV()); + } + break; + case StringArray: + if (kv.getTypeValue()==5) { +// byte[] stringArray = stringArrayToByteArray((String[]) value); +// ByteString byteStringArray = ByteString.copyFrom(stringArray); +// return metric.toBuilder().setBytesValue(byteStringArray).build(); + return Optional.of(kv.getJsonV()); + } + break; + case DataSet: + case File: + case Template: + log.error("Invalid type value [{}] for MetricDataType [{}]", kv, metricDataType.name()); + return Optional.empty(); + case Unknown: + log.error("Invalid MetricDataType [{}] type, value [{}]", kv, metricDataType.name()); + return Optional.empty(); + } + } catch (Exception e) { + log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); + return Optional.empty(); + } + return Optional.empty(); + } + + private static byte[] shortArrayToByteArray(short[] inputs) { + ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2); + for (short d : inputs) { + bb.putShort(d); + } + return bb.array(); + } + + private static byte[] integerArrayToByteArray(int[] inputs) { + ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4); + for (int d : inputs) { + bb.putInt(d); + } + return bb.array(); + } + + private static byte[] longArrayToByteArray(long[] inputs) { + ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); + for (long d : inputs) { + bb.putLong(d); + } + return bb.array(); + } + + private static byte[] doublArrayToByteArray(double[] inputs) { + ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); + for (double d : inputs) { + bb.putDouble(d); + } + return bb.array(); + } + + private static byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException { + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + DataOutputStream ds = new DataOutputStream(bas); + for (float f : inputs) { + try { + ds.writeFloat(f); + } catch (IOException e) { + throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS); + } + } + return bas.toByteArray(); + } + + private static byte[] booleanArrayToByteArray(boolean[] inputs) { + byte[] toReturn = new byte[inputs.length]; + for (int entry = 0; entry < toReturn.length; entry++) { + toReturn[entry] = (byte) (inputs[entry]?1:0); + } + return toReturn; + } + + private static byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException { + final ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + final ObjectOutputStream os = new ObjectOutputStream(bas); + os.writeObject(inputs); + os.flush(); + os.close(); + } catch (Exception e) { + throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS); + } + return bas.toByteArray(); + } + + @JsonIgnoreProperties( value = {"fileName"}) @JsonSerialize(