sparkplug: add metricsBirth

This commit is contained in:
nickAS21 2023-02-01 18:37:12 +02:00
parent a01c7a23d7
commit d7d4d861b3
10 changed files with 592 additions and 405 deletions

View File

@ -15,32 +15,23 @@
*/
package org.thingsboard.server.transport.mqtt.sparkplug;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.common.packet.MqttConnAck;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Calendar;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
/**
* Created by nickAS21 on 12.01.23
@ -71,171 +62,28 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
processBeforeTest(configProperties);
}
public void processClientWithCorrectNodeAccess() throws Exception {
public MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH() throws Exception {
long ts = calendar.getTimeInMillis();
long value = bdSeq = 0;
return clientWithCorrectNodeAccessTokenWithNDEATH(ts, value);
}
public MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH(long ts, long value) throws Exception {
String key = keysBdSeq;
MetricDataType metricDataType = Int64;
SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
deathPayload.addMetrics(createMetric(value, ts, key, metricDataType));
byte[] deathBytes = deathPayload.build().toByteArray();
this.client = new MqttV5TestClient();
MqttWireMessage response = clientWithCorrectNodeAccessToken(client);
Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType());
MqttConnAck connAckMsg = (MqttConnAck) response;
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode());
}
protected SparkplugBProto.Payload.Metric createMetric(Object value, TsKvEntry tsKvEntry, MetricDataType metricDataType) throws ThingsboardException {
SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder()
.setTimestamp(tsKvEntry.getTs())
.setName(tsKvEntry.getKey())
.setDatatype(metricDataType.toIntValue())
.build();
switch (metricDataType) {
case Int8:
case Int16:
case UInt8:
case UInt16:
int valueMetric = Integer.valueOf(String.valueOf(value));
return metric.toBuilder().setIntValue(valueMetric).build();
case Int32:
case UInt32:
if (value instanceof Long) {
return metric.toBuilder().setLongValue((long) value).build();
} else {
return metric.toBuilder().setIntValue((int)value).build();
}
case Int64:
case UInt64:
case DateTime:
return metric.toBuilder().setLongValue((long) value).build();
case Float:
return metric.toBuilder().setFloatValue((float) value).build();
case Double:
return metric.toBuilder().setDoubleValue((double) value).build();
case Boolean:
return metric.toBuilder().setBooleanValue((boolean) value).build();
case String:
case Text:
case UUID:
return metric.toBuilder().setStringValue((String) value).build();
case DataSet:
return metric.toBuilder().setDatasetValue((SparkplugBProto.Payload.DataSet) value).build();
case Bytes:
case Int8Array:
ByteString byteString = ByteString.copyFrom((byte[]) value);
return metric.toBuilder().setBytesValue(byteString).build();
case Int16Array:
case UInt8Array:
byte[] int16Array = shortArrayToByteArray((short[]) value);
ByteString byteInt16Array = ByteString.copyFrom((int16Array));
return metric.toBuilder().setBytesValue(byteInt16Array).build();
case Int32Array:
case UInt16Array:
case Int64Array:
case UInt32Array:
case UInt64Array:
case DateTimeArray:
if (value instanceof int[]) {
byte[] int32Array = integerArrayToByteArray((int[]) value);
ByteString byteInt32Array = ByteString.copyFrom((int32Array));
return metric.toBuilder().setBytesValue(byteInt32Array).build();
} else {
byte[] int64Array = longArrayToByteArray((long[]) value);
ByteString byteInt64Array = ByteString.copyFrom((int64Array));
return metric.toBuilder().setBytesValue(byteInt64Array).build();
}
case DoubleArray:
byte[] doubleArray = doublArrayToByteArray((double[]) value);
ByteString byteDoubleArray = ByteString.copyFrom(doubleArray);
return metric.toBuilder().setBytesValue(byteDoubleArray).build();
case FloatArray:
byte[] floatArray = floatArrayToByteArray((float[]) value);
ByteString byteFloatArray = ByteString.copyFrom(floatArray);
return metric.toBuilder().setBytesValue(byteFloatArray).build();
case BooleanArray:
byte[] booleanArray = booleanArrayToByteArray((boolean[]) value);
ByteString byteBooleanArray = ByteString.copyFrom(booleanArray);
return metric.toBuilder().setBytesValue(byteBooleanArray).build();
case StringArray:
byte[] stringArray = stringArrayToByteArray((String[]) value);
ByteString byteStringArray = ByteString.copyFrom(stringArray);
return metric.toBuilder().setBytesValue(byteStringArray).build();
case File:
SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value;
ByteString byteFileString = ByteString.copyFrom(file.getBytes());
return metric.toBuilder().setBytesValue(byteFileString).build();
case Template:
return metric.toBuilder().setTemplateValue((SparkplugBProto.Payload.Template) value).build();
case Unknown:
throw new ThingsboardException("Invalid value for MetricDataType " + metricDataType.name(), ThingsboardErrorCode.INVALID_ARGUMENTS);
}
return metric;
}
private byte[] shortArrayToByteArray(short[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2);
for (short d : inputs) {
bb.putShort(d);
}
return bb.array();
}
private byte[] integerArrayToByteArray(int[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4);
for (int d : inputs) {
bb.putInt(d);
}
return bb.array();
}
private byte[] longArrayToByteArray(long[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (long d : inputs) {
bb.putLong(d);
}
return bb.array();
}
private byte[] doublArrayToByteArray(double[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (double d : inputs) {
bb.putDouble(d);
}
return bb.array();
}
private byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException {
ByteArrayOutputStream bas = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bas);
for (float f : inputs) {
try {
ds.writeFloat(f);
} catch (IOException e) {
throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}
return bas.toByteArray();
}
private byte[] booleanArrayToByteArray(boolean[] inputs) {
byte[] toReturn = new byte[inputs.length];
for (int entry = 0; entry < toReturn.length; entry++) {
toReturn[entry] = (byte) (inputs[entry]?1:0);
}
return toReturn;
}
private byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException {
final ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
final ObjectOutputStream os = new ObjectOutputStream(bas);
os.writeObject(inputs);
os.flush();
os.close();
} catch (Exception e) {
throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS);
}
return bas.toByteArray();
}
private MqttWireMessage clientWithCorrectNodeAccessToken(MqttV5TestClient client) throws Exception {
IMqttToken connectionResult = client.connectAndWait(gatewayAccessToken);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(gatewayAccessToken);
String topic = NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode;
MqttMessage msg = new MqttMessage();
msg.setId(0);
msg.setPayload(deathBytes);
options.setWill(topic, msg);
IMqttToken connectionResult = client.connect(options);
return connectionResult.getResponse();
}

View File

@ -17,9 +17,7 @@ package org.thingsboard.server.transport.mqtt.sparkplug.connection;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttConnAck;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
@ -34,16 +32,16 @@ import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSpark
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int64;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
/**
* Created by nickAS21 on 12.01.23
@ -51,21 +49,10 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp
@Slf4j
public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest {
protected void processClientWithCorrectNodeAccessTokenTest() throws Exception {
processClientWithCorrectNodeAccess();
}
protected void processClientWithCorrectNodeAccessTokenWithNdeathTest() throws Exception {
protected void processClientWithCorrectNodeAccessTokenWithNDEATH_Test() throws Exception {
long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS;
long value = bdSeq = 0;
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, value));
SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
deathPayload.addMetrics(createMetric(value, tsKvEntryBdSecOriginal, metricDataType));
MqttWireMessage response = clientWithCorrectNodeAccessTokenWithNDEATH(deathPayload.build().toByteArray());
MqttWireMessage response = clientWithCorrectNodeAccessTokenWithNDEATH(ts, value);
Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType());
@ -86,15 +73,23 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr
Assert.assertEquals(expectedTsKvEntry, actualTsKvEntry);
}
protected void processClientWithCorrectAccessTokenCreatedDevices(int cntDevices) throws Exception {
processClientWithCorrectNodeAccess();
protected void processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test() throws Exception {
this.client = new MqttV5TestClient();
MqttException actualException = Assert.assertThrows(MqttException.class, () -> client.connectAndWait(gatewayAccessToken));
String expectedMessage = "Server unavailable.";
int expectedReasonCode = 136;
Assert.assertEquals(expectedMessage, actualException.getMessage());
Assert.assertEquals( expectedReasonCode, actualException.getReasonCode());
}
protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
long ts = calendar.getTimeInMillis();
MetricDataType metricDataType = Int32;
Set<String> deviceIds = new HashSet<>();
String keys = "Device Metric int32";
String key = "Device Metric int32";
int valueDeviceInt32 = 1024;
TsKvEntry expectedTsKvEntryDeviceInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueDeviceInt32)));
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, expectedTsKvEntryDeviceInt32, metricDataType);
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
for (int i=0; i < cntDevices; i++ ) {
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis())
@ -105,36 +100,18 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
payloadBirthDevice.build().toByteArray(), 0, false);
deviceIds.add(deviceName);
}
}
Assert.assertEquals(cntDevices, deviceIds.size());
for (String deviceName: deviceIds) {
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after crete")
.atMost(40, TimeUnit.SECONDS)
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;
});
}
deviceIds.add(deviceName);
}
Assert.assertEquals(cntDevices, deviceIds.size());
}
private MqttWireMessage clientWithCorrectNodeAccessTokenWithNDEATH(byte[] deathBytes) throws Exception {
this.client = new MqttV5TestClient();
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(gatewayAccessToken);
if (deathBytes != null) {
String topic = NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode;
MqttMessage msg = new MqttMessage();
msg.setId(0);
msg.setPayload(deathBytes);
options.setWill(topic, msg);
}
IMqttToken connectionResult = client.connect(options);
return connectionResult.getResponse();
}
}

View File

@ -39,24 +39,25 @@ public class MqttV5ClientSparkplugBConnectionTest extends AbstractMqttV5ClientSp
}
}
@Test
public void testClientWithCorrectAccessToken() throws Exception {
processClientWithCorrectNodeAccessTokenTest();
}
@Test
public void testClientWithCorrectAccessTokenWithNDEATH() throws Exception {
processClientWithCorrectNodeAccessTokenWithNdeathTest();
processClientWithCorrectNodeAccessTokenWithNDEATH_Test();
}
@Test
public void testClientWithCorrectAccessTokenCreatedOneDevice() throws Exception {
processClientWithCorrectAccessTokenCreatedDevices(1);
public void testClientWithCorrectNodeAccessTokenWithoutNDEATH() throws Exception {
processClientWithCorrectNodeAccessTokenWithoutNDEATH_Test();
}
@Test
public void testClientWithCorrectAccessTokenWithNDEATHCreatedOneDevice() throws Exception {
processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1);
}
@Test
public void testClientWithCorrectAccessTokenCreatedTwoDevice() throws Exception {
processClientWithCorrectAccessTokenCreatedDevices(2);
public void testClientWithCorrectAccessTokenWithNDEATHCreatedTwoDevice() throws Exception {
processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(2);
}
}

View File

@ -64,6 +64,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8Array;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
/**
* Created by nickAS21 on 12.01.23
@ -74,30 +75,28 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
protected ThreadLocalRandom random = ThreadLocalRandom.current();
protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception {
processClientWithCorrectNodeAccess();
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec));
payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType));
String key = keysBdSeq;
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, key, metricDataType));
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq);
String keys = "Node Control/Rebirth";
key = "Node Control/Rebirth";
boolean valueRebirth = false;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth));
payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType));
listKeys.add(keys);
payloadBirthNode.addMetrics(createMetric(valueRebirth, ts, key, metricDataType));
listKeys.add(key);
keys = "Node Metric int32";
key = "Node Metric int32";
int valueNodeInt32 = 1024;
metricDataType = Int32;
TsKvEntry expectedSsKvEntryNodeInt32 = new BasicTsKvEntry(ts, new LongDataEntry(keys, Integer.toUnsignedLong(valueNodeInt32)));
payloadBirthNode.addMetrics(createMetric(valueNodeInt32, expectedSsKvEntryNodeInt32, metricDataType));
listKeys.add(keys);
payloadBirthNode.addMetrics(createMetric(valueNodeInt32, ts, key, metricDataType));
listKeys.add(key);
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
@ -113,23 +112,22 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
}
protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception {
processClientWithCorrectNodeAccess();
clientWithCorrectNodeAccessTokenWithNDEATH();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
List<String> listKeys = new ArrayList<>();
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
MetricDataType metricDataType = Int64;
TsKvEntry tsKvEntryBdSecOriginal = new BasicTsKvEntry(ts, new LongDataEntry(keysBdSeq, valueBdSec));
payloadBirthNode.addMetrics(createMetric(valueBdSec, tsKvEntryBdSecOriginal, metricDataType));
String key = keysBdSeq;
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, key, metricDataType));
listKeys.add(SparkplugMessageType.NCMD.name() + " " + keysBdSeq);
String keys = "Node Control/Rebirth";
key = "Node Control/Rebirth";
boolean valueRebirth = true;
metricDataType = MetricDataType.Boolean;
TsKvEntry expectedSsKvEntryRebirth = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, valueRebirth));
payloadBirthNode.addMetrics(createMetric(valueRebirth, expectedSsKvEntryRebirth, metricDataType));
listKeys.add(keys);
payloadBirthNode.addMetrics(createMetric(valueRebirth, ts, key, metricDataType));
listKeys.add(key);
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NCMD.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
@ -145,7 +143,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
}
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildPrimitiveSimple() throws Exception {
processClientWithCorrectNodeAccess();
clientWithCorrectNodeAccessTokenWithNDEATH();;
String messageTypeName = SparkplugMessageType.NDATA.name();
List<String> listKeys = new ArrayList<>();
@ -175,7 +173,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
}
protected void processClientWithCorrectAccessTokenPushNodeMetricBuildArraysSimple() throws Exception {
processClientWithCorrectNodeAccess();
clientWithCorrectNodeAccessTokenWithNDEATH();;
String messageTypeName = SparkplugMessageType.NDATA.name();
List<String> listKeys = new ArrayList<>();
@ -332,45 +330,44 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
createdAddMetricTsKvJson(dataPayload, keys, strings, ts, StringArray, listTsKvEntry, listKeys);
}
private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value,
private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String key, Object value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, Long.valueOf(String.valueOf(value))));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, Long.valueOf(String.valueOf(value))));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
return tsKvEntry;
}
private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String keys, Object value,
private TsKvEntry createdAddMetricTsKvFloat(SparkplugBProto.Payload.Builder dataPayload, String key, Object value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
var f = new BigDecimal(String.valueOf(value));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(keys, f.doubleValue()));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(key, f.doubleValue()));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
return tsKvEntry;
}
private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String keys, double value,
private TsKvEntry createdAddMetricTsKvDouble(SparkplugBProto.Payload.Builder dataPayload, String key, double value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
var d = new BigDecimal(String.valueOf(value));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(keys, d.longValueExact()));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, d.longValueExact()));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
return tsKvEntry;
}
private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String keys, boolean value,
private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String key, boolean value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(keys, value));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(key, value));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
return tsKvEntry;
}
private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String keys, String value,
private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String key, String value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(keys, value));
dataPayload.addMetrics(createMetric(value, tsKvEntry, metricDataType));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(key, value));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
return tsKvEntry;
}
private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String keys,
private void createdAddMetricTsKvJson(SparkplugBProto.Payload.Builder dataPayload, String key,
Object values, long ts, MetricDataType metricDataType,
List<TsKvEntry> listTsKvEntry,
List<String> listKeys) throws ThingsboardException {
@ -428,11 +425,11 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
throw new IllegalStateException("Unexpected value: " + metricDataType);
}
if (nodeArray.size() > 0) {
Optional<TsKvEntry> tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(keys, nodeArray.toString())));
Optional<TsKvEntry> tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(key, nodeArray.toString())));
if (tsKvEntryOptional.isPresent()) {
dataPayload.addMetrics(createMetric(values, tsKvEntryOptional.get(), metricDataType));
dataPayload.addMetrics(createMetric(values, ts, key, metricDataType));
listTsKvEntry.add(tsKvEntryOptional.get());
listKeys.add(keys);
listKeys.add(key);
}
}
}

View File

@ -50,6 +50,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.OtaPackageId;
@ -107,6 +108,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe;
@ -390,13 +392,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// TODO
break;
case NBIRTH:
sparkplugSessionHandler.setMetricsBirthNode (sparkplugBProtoNode.getMetricsList());
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
break;
case NCMD:
case NDATA:
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
break;
case NDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg);
break;
case NRECORD:
// TODO
break;
@ -410,16 +412,27 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// TODO
break;
case DBIRTH:
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
System.out.println();
break;
case DCMD:
case DDATA:
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
break;
/**
* TODO
* 7.3.2. Device Death Certificate (DDEATH)
* The Sparkplug Topic Namespace for a device Death Certificate is:
* namespace/group_id/DDEATH/edge_node_id/device_id
* It is the responsibility of the MQTT EoN node to indicate the real-time state of either physical legacy device using
* poll/response protocols and/or local logical devices. If the device becomes unavailable for any reason (no
* response, CRC error, etc.) it is the responsibility of the EoN node to publish a DDEATH on behalf of the end device.
* Immediately upon reception of a DDEATH, any MQTT client subscribed to this device should set the data quality of
* all metrics to STALE and should note the time stamp when the DDEATH message was received.
*/
case DDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg);
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, deviceName);
break;
case DRECORD:
// TODO
@ -437,6 +450,62 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, MqttTopicSubscription subscription, MqttQoS reqQoS) throws ThingsboardException {
SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicName());
if (sparkplugTopic.getGroupId() == null) {
// TODO SUBSCRIBE NameSpace
} else if (sparkplugTopic.getType() == null) {
// TODO SUBSCRIBE GroupId
} else if (sparkplugTopic.isNode()) {
// A node topic
processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1);
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case NBIRTH:
// TODO
break;
case NCMD:
// TODO
break;
case NDATA:
// TODO
break;
case NDEATH:
// TODO
break;
case NRECORD:
// TODO
break;
default:
}
} else {
// A device topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case DBIRTH:
// TODO
break;
case DCMD:
// TODO
break;
case DDATA:
// TODO
break;
case DDEATH:
// TODO
break;
case DRECORD:
// TODO
break;
default:
}
}
}
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
Matcher fwMatcher;
@ -698,8 +767,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
MqttQoS reqQoS = subscription.qualityOfService();
try {
if (sparkplugSessionHandler != null) {
SparkplugTopic sparkplugTopic = parseTopicSubscribe(mqttMsg.payload().topicSubscriptions().get(0).topicName());
sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, sparkplugTopic, reqQoS);
handleSparkplugSubscribeMsg(grantedQoSList, subscription, reqQoS);
activityReported = true;
} else {
switch (topic) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
@ -1060,24 +1129,47 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void checkSparkplugSession(MqttConnectMessage connectMessage) {
/**
* Sparkplug Specification Version 2.2
* 7.1.1. EoN Node Death Certificate (NDEATH)
* The Death Certificate topic for an MQTT EoN node is:
* namespace/group_id/NDEATH/edge_node_id
* The Death Certificate topic and payload described here are not published as an MQTT message by a client, but
* provided as parameters within the MQTT CONNECT control packet when this MQTT EoN node first establishes the
* MQTT Client session.
*/
private void checkSparkplugNodeSession(MqttConnectMessage connectMessage, ChannelHandlerContext ctx) {
try {
if (sparkplugSessionHandler == null && validatedSparkplugTopic(connectMessage)) {
if (sparkplugSessionHandler == null) {
SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage);
if (sparkplugTopicNode != null) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic());
sparkplugSessionHandler = new SparkplugNodeSessionHandler(deviceSessionCtx, sessionId, sparkplugTopicNode);
sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode,
deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode);
} else {
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName());
throw new ThingsboardException("Invalid request body", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
}
} catch (Exception e) {
log.trace("[{}][{}] Failed to fetch sparkplugDevice additional info or sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e);
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close();
}
}
private boolean validatedSparkplugTopic (MqttConnectMessage connectMessage) {
return StringUtils.isNotBlank(connectMessage.payload().willTopic())
private SparkplugTopic validatedSparkplugTopicConnectedNode (MqttConnectMessage connectMessage) throws ThingsboardException {
if(StringUtils.isNotBlank(connectMessage.payload().willTopic())
&& connectMessage.payload().willMessageInBytes() != null
&& connectMessage.payload().willMessageInBytes().length > 0;
&& connectMessage.payload().willMessageInBytes().length > 0) {
SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic());
if(NDEATH.equals(sparkplugTopicNode.getType())){
return sparkplugTopicNode;
}
}
return null;
}
@Override
@ -1127,7 +1219,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onSuccess(Void msg) {
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
if (deviceSessionCtx.isSparkplug()) {
checkSparkplugSession(connectMessage);
checkSparkplugNodeSession(connectMessage, ctx);
} else {
checkGatewaySession(sessionMetaData);
}
@ -1169,7 +1261,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
String topic = attrSubTopicType.getAttributesSubTopic();
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType);
try {
if (sparkplugSessionHandler != null) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
sparkplugSessionHandler.createMqttPublishMsg(deviceSessionCtx, notification).ifPresent(sparkplugSessionHandler::writeAndFlush);
} else {
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
}
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
}

View File

@ -186,7 +186,7 @@ public abstract class AbstractGatewaySessionHandler {
}
}
ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
public ChannelFuture writeAndFlush(MqttMessage mqttMessage) {
return channel.writeAndFlush(mqttMessage);
}
@ -215,7 +215,7 @@ public abstract class AbstractGatewaySessionHandler {
}, context.getExecutor());
}
private ListenableFuture<MqttDeviceAwareSessionContext> onDeviceConnect(String deviceName, String deviceType) {
ListenableFuture<MqttDeviceAwareSessionContext> onDeviceConnect(String deviceName, String deviceType) {
MqttDeviceAwareSessionContext result = devices.get(deviceName);
if (result == null) {
Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
@ -252,7 +252,7 @@ public abstract class AbstractGatewaySessionHandler {
new TransportServiceCallback<>() {
@Override
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ;
GatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg) ;
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
@ -282,8 +282,8 @@ public abstract class AbstractGatewaySessionHandler {
}
}
private AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) {
return this.deviceSessionCtx.isSparkplug() ? new SparkplugSessionCtx(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) :
private GatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg) {
return this.deviceSessionCtx.isSparkplug() ? new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService) :
new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
@ -324,7 +324,7 @@ public abstract class AbstractGatewaySessionHandler {
}
}
private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg, ReturnCode.SUCCESS);
}
@ -716,6 +716,7 @@ public abstract class AbstractGatewaySessionHandler {
private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) {
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
System.out.println("Removed device " + deviceName + " from the gateway session");
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
}

View File

@ -19,11 +19,10 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@ -33,23 +32,25 @@ import java.util.stream.Collectors;
public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final Set<SparkplugBProto.Payload.Metric> metricBirth = new HashSet<>();
private final Map<String, SparkplugBProto.Payload.Metric> metricsBirthDevice;
public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
super(sessionId);
this.mqttQoSMap = mqttQoSMap;
this.metricsBirthDevice = new ConcurrentHashMap<>();
}
public ConcurrentMap<MqttTopicMatcher, Integer> getMqttQoSMap() {
return mqttQoSMap;
}
public Set<SparkplugBProto.Payload.Metric> getMetricBirth() {
return metricBirth;
public Map<String, SparkplugBProto.Payload.Metric> getMetricsBirthDevice() {
return metricsBirthDevice;
}
public void setMetricBirth(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.metricBirth.addAll(metrics);
public void setMetricsBirthDevice(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.metricsBirthDevice.putAll(metrics.stream()
.collect(Collectors.toMap(metric -> metric.getName(), metric -> metric)));
}
public MqttQoS getQoSForTopic(String topic) {

View File

@ -15,27 +15,23 @@
*/
package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
/**
* Created by nickAS21 on 08.12.22
*/
@Slf4j
public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext {
public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{
public SparkplugSessionCtx(AbstractGatewaySessionHandler parent,
public SparkplugDeviceSessionContext(AbstractGatewaySessionHandler parent,
TransportDeviceInfo deviceInfo,
DeviceProfile deviceProfile,
ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
ConcurrentMap<MqttTopicMatcher,
Integer> mqttQoSMap,
TransportService transportService) {
super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService);
}
@ -43,19 +39,7 @@ public class SparkplugSessionCtx extends AbstractGatewayDeviceSessionContext {
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush);
}
private Optional<MqttPublishMessage> createMqttPublishMsg (MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notification) {
try {
// TODO metrics from notification & MetricsDBIRTH
byte[] payloadInBytes = new byte[3];
String topic = ((SparkplugNodeSessionHandler) parent).sparkplugTopicNode.toString() + "/" + deviceInfo.getDeviceName();
return Optional.of(parent.getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes));
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e);
return Optional.empty();
}
((SparkplugNodeSessionHandler)parent).createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush);
}
}

View File

@ -22,9 +22,7 @@ import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.protobuf.Descriptors;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
@ -32,19 +30,24 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getFromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric;
/**
* Created by nickAS21 on 12.12.22
@ -52,13 +55,24 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopi
@Slf4j
public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
final SparkplugTopic sparkplugTopicNode;
private final SparkplugTopic sparkplugTopicNode;
private final Map<String, SparkplugBProto.Payload.Metric> metricsBirthNode;
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, SparkplugTopic sparkplugTopicNode) {
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId,
SparkplugTopic sparkplugTopicNode) {
super(deviceSessionCtx, sessionId);
this.sparkplugTopicNode = sparkplugTopicNode;
this.metricsBirthNode = new ConcurrentHashMap<>();
}
public void setMetricsBirthNode(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.metricsBirthNode.putAll(metrics.stream()
.collect(Collectors.toMap(metric -> metric.getName(), metric -> metric)));
}
public Map<String, SparkplugBProto.Payload.Metric> getMetricsBirthNode() {
return this.metricsBirthNode;
}
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
@ -72,14 +86,14 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException {
public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
checkDeviceName(deviceName);
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = topic.isNode() ?
Futures.immediateFuture(this.deviceSessionCtx) : checkDeviceConnected(deviceName);
Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName);
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
try {
contextListenableFuture.get().setMetricBirth(sparkplugBProto.getMetricsList());
contextListenableFuture.get().setMetricsBirthDevice(sparkplugBProto.getMetricsList());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed add Metrics. MessageType *BIRTH.", e);
}
@ -115,57 +129,21 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, SparkplugTopic sparkplugTopic, MqttQoS reqQoS) {
if (sparkplugTopic.getGroupId() == null) {
// TODO SUBSCRIBE NameSpace
} else if (sparkplugTopic.getType() == null) {
// TODO SUBSCRIBE GroupId
} else if (sparkplugTopic.isNode()) {
// A node topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case NBIRTH:
// TODO
break;
case NCMD:
// TODO
break;
case NDATA:
// TODO
break;
case NDEATH:
// TODO
break;
case NRECORD:
// TODO
break;
default:
public void onDeviceDisconnect(MqttPublishMessage mqttMsg, String deviceName) throws AdaptorException {
try {
processOnDisconnect(mqttMsg, deviceName);
} catch (RuntimeException e) {
throw new AdaptorException(e);
}
} else {
// A device topic
switch (sparkplugTopic.getType()) {
case STATE:
// TODO
break;
case DBIRTH:
// TODO
break;
case DCMD:
// TODO
break;
case DDATA:
// TODO
break;
case DDEATH:
// TODO
break;
case DRECORD:
// TODO
break;
default:
}
private ListenableFuture<MqttDeviceAwareSessionContext> onDeviceConnectProto(String deviceName) throws ThingsboardException {
try {
String deviceType = this.gateway.getDeviceType() + "-node";
return onDeviceConnect(deviceName, deviceType);
} catch (RuntimeException e) {
log.error("Failed Sparkplug Device connect proto!", e);
throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
}
@ -176,7 +154,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
long ts = protoMetric.getTimestamp();
String keys = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = getFromSparkplugBMetricToKeyValueProto(keys, protoMetric);
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(keys, protoMetric);
if (keyValueProtoOpt.isPresent()) {
List<TransportProtos.KeyValueProto> result = new ArrayList<>();
result.add(keyValueProtoOpt.get());
@ -209,17 +187,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
public void onDeviceConnectProto(MqttPublishMessage mqttPublishMessage, String nodeDeviceType) throws ThingsboardException {
try {
String topic = mqttPublishMessage.variableHeader().topicName();
SparkplugTopic sparkplugTopic = parseTopicSubscribe(topic);
String deviceName = checkDeviceName(sparkplugTopic.getDeviceId());
String deviceType = StringUtils.isEmpty(nodeDeviceType) ? DEFAULT_DEVICE_TYPE : nodeDeviceType;
processOnConnect(mqttPublishMessage, deviceName, deviceType);
} catch (RuntimeException | ThingsboardException e) {
log.error("Failed Sparkplug Device connect proto!", e);
throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
public SparkplugTopic getSparkplugTopicNode() {
return this.sparkplugTopicNode;
}
public Optional<MqttPublishMessage> createMqttPublishMsg (MqttDeviceAwareSessionContext ctx,
TransportProtos.AttributeUpdateNotificationMsg notification,
String... deviceName) {
try {
long ts = notification.getSharedUpdated(0).getTs();
String key = notification.getSharedUpdated(0).getKv().getKey();
if (metricsBirthNode.containsKey(key)) {
SparkplugBProto.Payload.Metric metricBirth = metricsBirthNode.get(key);
MetricDataType metricDataType = MetricDataType.fromInteger(metricBirth.getDatatype());
Optional value = validatedValueByTypeMetric(notification.getSharedUpdated(0).getKv(), metricDataType);
if (value.isPresent()) {
SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts);
cmdPayload.addMetrics(createMetric(value, ts, key, metricDataType));
byte[] payloadInBytes = cmdPayload.build().toByteArray();
String topic = deviceName == null ? sparkplugTopicNode.toString() : sparkplugTopicNode.toString()
+ "/" + deviceName;
return Optional.of(getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes));
}
} else {
return Optional.empty();
}
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e);
return Optional.empty();
}
return Optional.empty();
}
}

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.ser.std.FileSerializer;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
@ -26,7 +27,11 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.DoubleBuffer;
@ -45,7 +50,7 @@ import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
@Slf4j
public class SparkplugMetricUtil {
public static Optional<TransportProtos.KeyValueProto> getFromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
public static Optional<TransportProtos.KeyValueProto> fromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
// Check if the null flag has been set indicating that the value is null
if (protoMetric.getIsNull()) {
return Optional.empty();
@ -214,6 +219,283 @@ public class SparkplugMetricUtil {
}
}
public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException {
SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder()
.setTimestamp(ts)
.setName(key)
.setDatatype(metricDataType.toIntValue())
.build();
switch (metricDataType) {
case Int8:
case Int16:
case UInt8:
case UInt16:
int valueMetric = Integer.valueOf(String.valueOf(value));
return metric.toBuilder().setIntValue(valueMetric).build();
case Int32:
case UInt32:
if (value instanceof Long) {
return metric.toBuilder().setLongValue((long) value).build();
} else {
return metric.toBuilder().setIntValue((int)value).build();
}
case Int64:
case UInt64:
case DateTime:
return metric.toBuilder().setLongValue((long) value).build();
case Float:
return metric.toBuilder().setFloatValue((float) value).build();
case Double:
return metric.toBuilder().setDoubleValue((double) value).build();
case Boolean:
return metric.toBuilder().setBooleanValue((boolean) value).build();
case String:
case Text:
case UUID:
return metric.toBuilder().setStringValue((String) value).build();
case Bytes:
case Int8Array:
ByteString byteString = ByteString.copyFrom((byte[]) value);
return metric.toBuilder().setBytesValue(byteString).build();
case Int16Array:
case UInt8Array:
byte[] int16Array = shortArrayToByteArray((short[]) value);
ByteString byteInt16Array = ByteString.copyFrom((int16Array));
return metric.toBuilder().setBytesValue(byteInt16Array).build();
case Int32Array:
case UInt16Array:
case Int64Array:
case UInt32Array:
case UInt64Array:
case DateTimeArray:
if (value instanceof int[]) {
byte[] int32Array = integerArrayToByteArray((int[]) value);
ByteString byteInt32Array = ByteString.copyFrom((int32Array));
return metric.toBuilder().setBytesValue(byteInt32Array).build();
} else {
byte[] int64Array = longArrayToByteArray((long[]) value);
ByteString byteInt64Array = ByteString.copyFrom((int64Array));
return metric.toBuilder().setBytesValue(byteInt64Array).build();
}
case DoubleArray:
byte[] doubleArray = doublArrayToByteArray((double[]) value);
ByteString byteDoubleArray = ByteString.copyFrom(doubleArray);
return metric.toBuilder().setBytesValue(byteDoubleArray).build();
case FloatArray:
byte[] floatArray = floatArrayToByteArray((float[]) value);
ByteString byteFloatArray = ByteString.copyFrom(floatArray);
return metric.toBuilder().setBytesValue(byteFloatArray).build();
case BooleanArray:
byte[] booleanArray = booleanArrayToByteArray((boolean[]) value);
ByteString byteBooleanArray = ByteString.copyFrom(booleanArray);
return metric.toBuilder().setBytesValue(byteBooleanArray).build();
case StringArray:
byte[] stringArray = stringArrayToByteArray((String[]) value);
ByteString byteStringArray = ByteString.copyFrom(stringArray);
return metric.toBuilder().setBytesValue(byteStringArray).build();
case DataSet:
return metric.toBuilder().setDatasetValue((SparkplugBProto.Payload.DataSet) value).build();
case File:
SparkplugMetricUtil.File file = (SparkplugMetricUtil.File) value;
ByteString byteFileString = ByteString.copyFrom(file.getBytes());
return metric.toBuilder().setBytesValue(byteFileString).build();
case Template:
return metric.toBuilder().setTemplateValue((SparkplugBProto.Payload.Template) value).build();
case Unknown:
throw new ThingsboardException("Invalid value for MetricDataType " + metricDataType.name(), ThingsboardErrorCode.INVALID_ARGUMENTS);
}
return metric;
}
public static Optional<Object> validatedValueByTypeMetric (TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
try {
switch (metricDataType) {
case Int8:
case Int16:
case UInt8:
case UInt16:
case Int32:
case UInt32:
case Int64:
case UInt64:
case DateTime:
if (kv.getTypeValue()==1) {
return Optional.of(Integer.valueOf(String.valueOf(kv.getLongV())));
}
break;
case Float:
if (kv.getTypeValue()==2) {
var f = new BigDecimal(String.valueOf(kv.getDoubleV()));
return Optional.of(f.floatValue());
}
break;
case Double:
if (kv.getTypeValue()==2) {
return Optional.of(kv.getLongV());
}
break;
case Boolean:
if (kv.getTypeValue()==3) {
return Optional.of(kv.getBoolV());
}
break;
case String:
case Text:
case UUID:
if (kv.getTypeValue()==4) {
return Optional.of(kv.getStringV());
}
break;
case Bytes:
case Int8Array:
if (kv.getTypeValue()==5) {
// ByteString byteString = ByteString.copyFrom((byte[]) value);
// return metric.toBuilder().setBytesValue(byteString).build();
return Optional.of(kv.getJsonV());
}
break;
case Int16Array:
case UInt8Array:
if (kv.getTypeValue()==5) {
// byte[] int16Array = shortArrayToByteArray((short[]) value);
// ByteString byteInt16Array = ByteString.copyFrom((int16Array));
return Optional.of(kv.getJsonV());
}
break;
case Int32Array:
case UInt16Array:
case Int64Array:
case UInt32Array:
case UInt64Array:
case DateTimeArray:
if (kv.getTypeValue()==5) {
// if (value instanceof int[]) {
// byte[] int32Array = integerArrayToByteArray((int[]) value);
// ByteString byteInt32Array = ByteString.copyFrom((int32Array));
// return metric.toBuilder().setBytesValue(byteInt32Array).build();
// } else {
// byte[] int64Array = longArrayToByteArray((long[]) value);
// ByteString byteInt64Array = ByteString.copyFrom((int64Array));
// return metric.toBuilder().setBytesValue(byteInt64Array).build();
// }
return Optional.of(kv.getJsonV());
}
break;
case DoubleArray:
if (kv.getTypeValue()==5) {
// byte[] doubleArray = doublArrayToByteArray((double[]) value);
// ByteString byteDoubleArray = ByteString.copyFrom(doubleArray);
// return metric.toBuilder().setBytesValue(byteDoubleArray).build();
return Optional.of(kv.getJsonV());
}
break;
case FloatArray:
if (kv.getTypeValue()==5) {
// byte[] floatArray = floatArrayToByteArray((float[]) value);
// ByteString byteFloatArray = ByteString.copyFrom(floatArray);
// return metric.toBuilder().setBytesValue(byteFloatArray).build();
return Optional.of(kv.getJsonV());
}
break;
case BooleanArray:
if (kv.getTypeValue()==5) {
// byte[] booleanArray = booleanArrayToByteArray((boolean[]) value);
// ByteString byteBooleanArray = ByteString.copyFrom(booleanArray);
// return metric.toBuilder().setBytesValue(byteBooleanArray).build();
return Optional.of(kv.getJsonV());
}
break;
case StringArray:
if (kv.getTypeValue()==5) {
// byte[] stringArray = stringArrayToByteArray((String[]) value);
// ByteString byteStringArray = ByteString.copyFrom(stringArray);
// return metric.toBuilder().setBytesValue(byteStringArray).build();
return Optional.of(kv.getJsonV());
}
break;
case DataSet:
case File:
case Template:
log.error("Invalid type value [{}] for MetricDataType [{}]", kv, metricDataType.name());
return Optional.empty();
case Unknown:
log.error("Invalid MetricDataType [{}] type, value [{}]", kv, metricDataType.name());
return Optional.empty();
}
} catch (Exception e) {
log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
return Optional.empty();
}
return Optional.empty();
}
private static byte[] shortArrayToByteArray(short[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2);
for (short d : inputs) {
bb.putShort(d);
}
return bb.array();
}
private static byte[] integerArrayToByteArray(int[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4);
for (int d : inputs) {
bb.putInt(d);
}
return bb.array();
}
private static byte[] longArrayToByteArray(long[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (long d : inputs) {
bb.putLong(d);
}
return bb.array();
}
private static byte[] doublArrayToByteArray(double[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (double d : inputs) {
bb.putDouble(d);
}
return bb.array();
}
private static byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException {
ByteArrayOutputStream bas = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bas);
for (float f : inputs) {
try {
ds.writeFloat(f);
} catch (IOException e) {
throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}
return bas.toByteArray();
}
private static byte[] booleanArrayToByteArray(boolean[] inputs) {
byte[] toReturn = new byte[inputs.length];
for (int entry = 0; entry < toReturn.length; entry++) {
toReturn[entry] = (byte) (inputs[entry]?1:0);
}
return toReturn;
}
private static byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException {
final ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
final ObjectOutputStream os = new ObjectOutputStream(bas);
os.writeObject(inputs);
os.flush();
os.close();
} catch (Exception e) {
throw new ThingsboardException("Invalid value float ", ThingsboardErrorCode.INVALID_ARGUMENTS);
}
return bas.toByteArray();
}
@JsonIgnoreProperties(
value = {"fileName"})
@JsonSerialize(