diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index 05a5843626..13e9dcaf0e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -29,6 +29,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; import org.junit.Assert; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -67,7 +68,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; /** * Created by nickAS21 on 12.01.23 @@ -112,31 +113,44 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte long value = bdSeq = 0; clientWithCorrectNodeAccessTokenWithNDEATH(ts, value); } + public void clientWithCorrectNodeAccessTokenWithNDEATH(Long alias) throws Exception { + long ts = calendar.getTimeInMillis(); + long value = bdSeq = 0; + clientMqttV5ConnectWithNDEATH(ts, value,alias); + } public void clientWithCorrectNodeAccessTokenWithNDEATH(long ts, long value) throws Exception { - IMqttToken connectionResult = clientConnectWithNDEATH(ts, value); + IMqttToken connectionResult = clientMqttV5ConnectWithNDEATH(ts, value, -1L); MqttWireMessage response = connectionResult.getResponse(); Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType()); MqttConnAck connAckMsg = (MqttConnAck) response; Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode()); } - public IMqttToken clientConnectWithNDEATH(long ts, long value, String... nameSpaceBad) throws Exception { - String key = keysBdSeq; + public IMqttToken clientMqttV5ConnectWithNDEATH(long ts, long value, Long alias, String... nameSpaceBad) throws Exception { + return clientMqttV5ConnectWithNDEATH(ts, value, null, alias, nameSpaceBad); + } + public IMqttToken clientMqttV5ConnectWithNDEATH(long ts, long value, String metricName, Long alias, String... nameSpaceBad) throws Exception { + String key = metricName == null ? keysBdSeq : metricName; MetricDataType metricDataType = Int64; SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()); - deathPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + deathPayload.addMetrics(createMetric(value, ts, key, metricDataType, alias)); byte[] deathBytes = deathPayload.build().toByteArray(); this.client = new MqttV5TestClient(); this.mqttCallback = new SparkplugMqttCallback(); this.client.setCallback(this.mqttCallback); MqttConnectionOptions options = new MqttConnectionOptions(); + // If the MQTT client is using MQTT v5.0, the Edge Node’s MQTT CONNECT packet MUST set the Clean Start flag to true and the Session Expiry Interval to 0 + options.setCleanStart(true); + options.setSessionExpiryInterval(0L); options.setUserName(gatewayAccessToken); - String nameSpace = nameSpaceBad.length == 0 ? NAMESPACE : nameSpaceBad[0]; + String nameSpace = nameSpaceBad.length == 0 ? TOPIC_ROOT_SPB_V_1_0 : nameSpaceBad[0]; String topic = nameSpace + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode; + // The NDEATH message MUST set the MQTT Will QoS to 1 and Retained flag to false MqttMessage msg = new MqttMessage(); msg.setId(0); + msg.setQos(1); msg.setPayload(deathBytes); options.setWill(topic, msg); return client.connect(options); @@ -148,19 +162,19 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte MetricDataType metricDataType = Int32; String key = "Node Metric int32"; int valueDeviceInt32 = 1024; - SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); + SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType, -1L); SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() .setTimestamp(ts) .setSeq(getBdSeqNum()); payloadBirthNode.addMetrics(metric); payloadBirthNode.setTimestamp(ts); if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } valueDeviceInt32 = 4024; - metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32); + metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32, -1L); for (int i = 0; i < cntDevices; i++) { SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() .setTimestamp(ts) @@ -169,7 +183,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte payloadBirthDevice.addMetrics(metric); if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, payloadBirthDevice.build().toByteArray(), 0, false); AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceName + "] after created") @@ -187,6 +201,49 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return devices; } + protected List connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices(long ts) throws Exception { + List devices = new ArrayList<>(); + Long alias = 0L; + clientWithCorrectNodeAccessTokenWithNDEATH(alias++); + MetricDataType metricDataType = Int32; + String key = "Node Metric int32"; + int valueDeviceInt32 = 1024; + SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType, alias++); + SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getBdSeqNum()); + payloadBirthNode.addMetrics(metric); + payloadBirthNode.setTimestamp(ts); + if (client.isConnected()) { + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, + payloadBirthNode.build().toByteArray(), 0, false); + } + + valueDeviceInt32 = 4024; + metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32, alias++); + SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getSeqNum()); + String deviceName = deviceId + "_" + 1; + + payloadBirthDevice.addMetrics(metric); + if (client.isConnected()) { + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, + payloadBirthDevice.build().toByteArray(), 0, false); + AtomicReference device = new AtomicReference<>(); + await(alias + "find device [" + deviceName + "] after created") + .atMost(200, TimeUnit.SECONDS) + .until(() -> { + device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); + return device.get() != null; + }); + devices.add(device.get()); + } + + Assert.assertEquals(1, devices.size()); + return devices; + } + protected long getBdSeqNum() throws Exception { if (bdSeq == 256) { bdSeq = 0; @@ -207,16 +264,20 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte .setTimestamp(calendar.getTimeInMillis()); long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long valueBdSec = getBdSeqNum(); - payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64)); + payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64, -1L)); listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); - payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean)); + payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean, -1L)); listKeys.add(keyNodeRebirth); - payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType)); + if (StringUtils.isNotBlank(metricKey)) { + payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType, -1L)); + } else { + payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType, 4L)); + } listKeys.add(metricKey); if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } return listKeys; @@ -297,7 +358,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String key, Object value, long ts, MetricDataType metricDataType) throws ThingsboardException { TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, Long.valueOf(String.valueOf(value)))); - dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L)); return tsKvEntry; } @@ -305,7 +366,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte long ts, MetricDataType metricDataType) throws ThingsboardException { Double dd = Double.parseDouble(Float.toString(value)); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(key, dd)); - dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L)); return tsKvEntry; } @@ -313,21 +374,21 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte long ts, MetricDataType metricDataType) throws ThingsboardException { Long l = Double.valueOf(value).longValue(); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, l)); - dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L)); return tsKvEntry; } private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String key, boolean value, long ts, MetricDataType metricDataType) throws ThingsboardException { TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(key, value)); - dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L)); return tsKvEntry; } private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String key, String value, long ts, MetricDataType metricDataType) throws ThingsboardException { TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(key, value)); - dataPayload.addMetrics(createMetric(value, ts, key, metricDataType)); + dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L)); return tsKvEntry; } @@ -348,7 +409,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte if (nodeArray.size() > 0) { Optional tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(key, nodeArray.toString()))); if (tsKvEntryOptional.isPresent()) { - dataPayload.addMetrics(createMetric(values, ts, key, metricDataType)); + dataPayload.addMetrics(createMetric(values, ts, key, metricDataType, -1L)); listTsKvEntry.add(tsKvEntryOptional.get()); listKeys.add(key); } @@ -416,7 +477,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return java.util.UUID.randomUUID().toString(); } - public class SparkplugMqttCallback implements MqttCallback { + public class SparkplugMqttCallback implements MqttCallback { private final List messageArrivedMetrics = new ArrayList<>(); @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 4ca7ea5661..e4e76e6761 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -16,7 +16,6 @@ package org.thingsboard.server.transport.mqtt.sparkplug.attributes; import com.fasterxml.jackson.core.type.TypeReference; -import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.thingsboard.server.common.data.Device; @@ -33,8 +32,10 @@ import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DDATA; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDATA; /** * Created by nickAS21 on 12.01.23 @@ -50,7 +51,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Assert.assertTrue(listKeys.contains(keyNodeRebirth)); String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}"; Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); await(alias + SparkplugMessageType.NBIRTH.name()) @@ -76,7 +76,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Object metricValue = nextBoolean(); connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); // Boolean <-> String @@ -140,7 +139,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Object metricValue = nextUInt32(); connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); // Long <-> String @@ -192,7 +190,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Object metricValue = nextFloat(30, 400); connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); // Float <-> String @@ -237,6 +234,67 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getFloatValue()); } + protected void processClientWithCorrectAccessTokenPublishMetricDataTypeFromJson_SendValueOk() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); + awaitForDeviceActorToReceiveSubscription(devices.get(0).getId(), FeatureType.ATTRIBUTES, 1); + + // Node Edge + SparkplugMessageType messageType = NCMD; + String keyBirthNameNode = "Node Metric int32"; + int valueBirthNameNode = 1024; + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameNode + "\":" + valueBirthNameNode + "}}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(keyBirthNameNode, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(valueBirthNameNode == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + messageType = NDATA; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameNode + "\":" + valueBirthNameNode + "}}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(keyBirthNameNode, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(valueBirthNameNode == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + // Device + messageType = DCMD; + String keyBirthNameDevice = metricBirthName_Int32; + int valueBirthNameDevice = 123456; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameDevice + "\":" + valueBirthNameDevice + "}}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + DCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(keyBirthNameDevice, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(valueBirthNameDevice == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + messageType = DDATA; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameDevice + "\":" + valueBirthNameDevice + "}}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + DCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(keyBirthNameDevice, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(valueBirthNameDevice == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + } + protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); MetricDataType metricDataType = MetricDataType.Double; @@ -244,7 +302,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Object metricValue = nextDouble(); connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); // Double <-> String @@ -296,7 +353,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Object metricValue = nextString(); connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1); // String <-> Long @@ -356,7 +412,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra return mqttCallback.getMessageArrivedMetrics().size() == 1; }); Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); - Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java index a8d67360be..cecad77c7a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java @@ -57,6 +57,10 @@ public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSp public void testClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk(); } + @Test + public void testClientWithCorrectAccessTokenPublishMetricDataTypeFromJson_SendValueOk() throws Exception { + processClientWithCorrectAccessTokenPublishMetricDataTypeFromJson_SendValueOk(); + } @Test public void testClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index 459596e38f..9b22bd52d9 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -40,7 +40,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConn import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; /** * Created by nickAS21 on 12.01.23 @@ -80,7 +80,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra protected void processClientWithCorrectNodeAccessTokenNameSpaceInvalid_Test() throws Exception { long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long value = bdSeq = 0; - MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientConnectWithNDEATH(ts, value, "spBv1.2")); + MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientMqttV5ConnectWithNDEATH(ts, value, -1L,"spBv1.2")); String expectedMessage = "Server unavailable."; int expectedReasonCode = 136; Assert.assertEquals(expectedMessage, actualException.getMessage()); @@ -135,7 +135,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra if (client.isConnected()) { List devicesList = new ArrayList<>(devices); Device device = devicesList.get(indexDeviceDisconnect); - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(), + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(), payloadDeathDevice.build().toByteArray(), 0, false); await(alias + messageName(STATE) + ", device: " + device.getName()) .atMost(40, TimeUnit.SECONDS) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java index 8d351bbd9d..96fc9c0a4c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java @@ -15,8 +15,8 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug.rpc; -import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.common.MqttException; import org.junit.Assert; import org.junit.Test; import org.thingsboard.server.common.data.Device; @@ -32,7 +32,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; @Slf4j public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest { @@ -45,7 +44,6 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie clientWithCorrectNodeAccessTokenWithNDEATH(); connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.RPC, 1); String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}"; String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway); @@ -63,9 +61,10 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { long ts = calendar.getTimeInMillis(); List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + awaitForDeviceActorToReceiveSubscription(devices.get(0).getId(), FeatureType.RPC, 1); String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}"; String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0)); - await(alias + NCMD.name()) + await(alias + DCMD.name()) .atMost(40, TimeUnit.SECONDS) .until(() -> { return mqttCallback.getMessageArrivedMetrics().size() == 1; @@ -75,12 +74,41 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); } + @Test + public void processClientDeviceWithCorrectAccessTokenPublishWithAlias_TwoWayRpc_Success() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices(ts); + awaitForDeviceActorToReceiveSubscription(devices.get(0).getId(), FeatureType.RPC, 1); + String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}"; + String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0)); + await(alias + DCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(expected, actual); + Assert.assertFalse(mqttCallback.getMessageArrivedMetrics().get(0).hasName()); + Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).hasAlias()); + Assert.assertTrue(2L == mqttCallback.getMessageArrivedMetrics().get(0).getAlias()); + Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + @Test + public void processClientNodeWithCorrectAccessTokenPublishWithAliasWithoutMetricName_TwoWayRpc_BAD_REQUEST_PARAMS() throws Exception { + long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; + long value = bdSeq = 0; + MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientMqttV5ConnectWithNDEATH(ts, value, "",4L)); + String expectedMessage = "Server unavailable."; + int expectedReasonCode = 136; + Assert.assertEquals(expectedMessage, actualException.getMessage()); + Assert.assertEquals(expectedReasonCode, actualException.getReasonCode()); + } + @Test public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.RPC, 1); String invalidateTypeMessageName = "RCMD"; String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " + @@ -94,7 +122,6 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie clientWithCorrectNodeAccessTokenWithNDEATH(); connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.RPC, 1); String metricNameBad = metricBirthName_Int32 + "_Bad"; String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java index 10ef53260c..eb6c613940 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java @@ -47,6 +47,15 @@ public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest { public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success(); } + @Test + public void testClientDeviceWithCorrectAccessTokenPublishWithAlias_TwoWayRpc_Success() throws Exception { + processClientDeviceWithCorrectAccessTokenPublishWithAlias_TwoWayRpc_Success(); + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublishWithAliasWithoutMetricName_TwoWayRpc_BAD_REQUEST_PARAMS() throws Exception { + processClientNodeWithCorrectAccessTokenPublishWithAliasWithoutMetricName_TwoWayRpc_BAD_REQUEST_PARAMS(); + } @Test public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index c5d0dec98b..8368db546b 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; /** * Created by nickAS21 on 12.01.23 @@ -67,7 +67,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, ndataPayload.build().toByteArray(), 0, false); } @@ -96,7 +96,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac createdAddMetricValueArraysPrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts); if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, + client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + messageTypeName + "/" + edgeNode, ndataPayload.build().toByteArray(), 0, false); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 5aea93528a..7f9da9e0b9 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -28,6 +28,8 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty; +import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; @@ -84,6 +86,7 @@ import org.thingsboard.server.transport.mqtt.limits.SessionLimits; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; +import org.thingsboard.server.transport.mqtt.session.SparkplugDeviceSessionContext; import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; @@ -101,6 +104,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map.Entry; import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; @@ -120,9 +124,14 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProtoFromJsonNode; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic.parseTopic; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.parseTopicPublish; /** * @author Andrew Shvayka @@ -444,11 +453,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + /** + * It may be the case that an Edge Node has many dynamic associated devices. + * Publish: spBv1.0/G1/DBIRTH/E1/+ + * Publish: spBv1.0/G1/DDATA/E1/+ + * Publish: spBv1.0/G1/DCMD/E1/+ + * Publish: spBv1.0/G1/DDEATH/E1/+ + * @param ctx + * @param topicName + * @param mqttMsg + */ + private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, MqttPublishMessage mqttMsg) { int msgId = mqttMsg.variableHeader().packetId(); try { SparkplugTopic sparkplugTopic = parseTopicPublish(topicName); - if (sparkplugTopic.isNode()) { + boolean isWildcardInPublish = topicName.contains("+"); + if (!isWildcardInPublish && sparkplugTopic.isNode()) { // A node topic SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); switch (sparkplugTopic.getType()) { @@ -457,21 +478,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case NDATA: sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, sparkplugTopic); break; + case NDEATH: + if (sparkplugSessionHandler.onValidateNDEATH(sparkplugBProtoNode)) { + doDisconnect(); + break; + } else { + throw new ThingsboardException(SPARKPLUG_BD_SEQUENCE_NUMBER_KEY + " of " + NDEATH.name() + " is not equals " + + SPARKPLUG_BD_SEQUENCE_NUMBER_KEY + " of " + NBIRTH.name(), ThingsboardErrorCode.BAD_REQUEST_PARAMS); + } default: } } else { // A device topic SparkplugBProto.Payload sparkplugBProtoDevice = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload())); - switch (sparkplugTopic.getType()) { - case DBIRTH: - case DCMD: - case DDATA: - sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic); - break; - case DDEATH: - sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, sparkplugTopic.getDeviceId()); - break; - default: + if (isWildcardInPublish) { + for (Entry entry : sparkplugSessionHandler.getDevices().entrySet()) { + String deviceName = entry.getKey(); + SparkplugTopic sparkplugTopicDevice = sparkplugTopic; + sparkplugTopicDevice.updateDeviceIdPlus(deviceName); + handleSparkplugPublishDeviceMsg(sparkplugTopicDevice, msgId, mqttMsg, sparkplugBProtoDevice); + } + } else { + handleSparkplugPublishDeviceMsg(sparkplugTopic, msgId, mqttMsg, sparkplugBProtoDevice); } } } catch (RuntimeException e) { @@ -484,6 +512,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + /** + * It may be the case that an Edge Node has many dynamic associated devices. + * Publish: spBv1.0/G1/DBIRTH/E1/+ + * Publish: spBv1.0/G1/DDATA/E1/+ + * Publish: spBv1.0/G1/DCMD/E1/+ + * Publish: spBv1.0/G1/DDEATH/E1/+ + * @param sparkplugTopic + * @param msgId + * @param mqttMsg + * @throws AdaptorException + * @throws ThingsboardException + * @throws InvalidProtocolBufferException + */ + private void handleSparkplugPublishDeviceMsg(SparkplugTopic sparkplugTopic, int msgId, + MqttPublishMessage mqttMsg, SparkplugBProto.Payload sparkplugBProtoDevice) + throws AdaptorException, ThingsboardException, InvalidProtocolBufferException { + // A device topic + switch (sparkplugTopic.getType()) { + case DBIRTH: + case DCMD: + case DDATA: + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic); + break; + case DDEATH: + sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, sparkplugTopic.getDeviceId()); + break; + default: + } + } + private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { Matcher fwMatcher; @@ -786,7 +844,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } try { if (sparkplugSessionHandler != null) { - sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, subscription, reqQoS); + sparkplugSessionHandler.handleSparkplugSubscribeMsg(subscription); activityReported = true; } else { switch (topic) { @@ -877,13 +935,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement registerSubQoS(topic, grantedQoSList, reqQoS); } - public void processAttributesRpcSubscribeSparkplugNode(List grantedQoSList, MqttQoS reqQoS) { + /** + * 3.0.0 Edge Node Session Establishment: + * ncmd-subscribe + * [tck-id-message-flow-edge-node-ncmd-subscribe] The MQTT client associated with the Edge + * Node MUST subscribe to a topic of the form spBv1.0/group_id/NCMD/edge_node_id where + * group_id is the Sparkplug Group ID and the edge_node_id is the Sparkplug Edge Node ID for + * this Edge Node. It MUST subscribe on this topic with a QoS of 1. + */ + public void processAttributesRpcSubscribeSparkplugNode() { + List grantedQoSList = new ArrayList<>(); transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() .setSessionInfo(deviceSessionCtx.getSessionInfo()) .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) .build(), null); - registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, reqQoS); + registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, AT_LEAST_ONCE); } public void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { @@ -1180,12 +1247,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void checkSparkplugNodeSession(MqttConnectMessage connectMessage, ChannelHandlerContext ctx, SessionMetaData sessionMetaData) { try { if (sparkplugSessionHandler == null) { - SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage); - if (sparkplugTopicNode != null) { + SparkplugTopic sparkplugTopic = validatedSparkplugConnectedWillTopic(connectMessage); + if (sparkplugTopic != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); - sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode); - sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode); + sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopic); sessionMetaData.setOverwriteActivityTime(true); + // ncmd-subscribe + processAttributesRpcSubscribeSparkplugNode(); } else { log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName()); throw new ThingsboardException("Invalid request body", ThingsboardErrorCode.BAD_REQUEST_PARAMS); @@ -1198,12 +1267,33 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private SparkplugTopic validatedSparkplugTopicConnectedNode(MqttConnectMessage connectMessage) throws ThingsboardException { + /** + * The Death Certificate topic and payload described here are not “published” as an MQTT message by a client, + * but provided as parameters within the MQTT CONNECT control packet when this Sparkplug Edge Node first establishes the MQTT Client session. + * - NDEATH message MUST be registered as a Will Message in the MQTT CONNECT packet. + * -- in the MQTT CONNECT packet The NDEATH message MUST set the MQTT Will QoS to 1. + * -- in the MQTT CONNECT packet The NDEATH message MUST set the MQTT Will Retained flag to false. + * -- If the MQTT client is using MQTT v3.1.1, the Edge Node’s MQTT CONNECT packet MUST set the Clean Session flag to true. + * -- If the MQTT client is using MQTT v5.0, the Edge Node’s MQTT CONNECT packet MUST set the Clean Start flag to true and the Session Expiry Interval to 0 + * @param connectMessage + * @return + * @throws ThingsboardException + */ + private SparkplugTopic validatedSparkplugConnectedWillTopic(MqttConnectMessage connectMessage) throws ThingsboardException { if (StringUtils.isNotBlank(connectMessage.payload().willTopic()) && connectMessage.payload().willMessageInBytes() != null && connectMessage.payload().willMessageInBytes().length > 0) { - SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic()); + SparkplugTopic sparkplugTopicNode = parseTopic(connectMessage.payload().willTopic()); if (NDEATH.equals(sparkplugTopicNode.getType())) { + if (connectMessage.variableHeader().willQos() != 1 || connectMessage.variableHeader().isWillRetain()) + return null; + if (!connectMessage.variableHeader().isCleanSession()) return null; + int mqttVer = connectMessage.variableHeader().version(); + if (mqttVer == 5) { + Object sessionExpiryIntervalObj = connectMessage.variableHeader().properties().isEmpty() ? null : connectMessage.variableHeader().properties().getProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL.value()); + Integer sessionExpiryInterval = sessionExpiryIntervalObj == null ? null : ((IntegerProperty) sessionExpiryIntervalObj).value(); + if (sessionExpiryInterval == null || sessionExpiryInterval != 0) return null; + } return sparkplugTopicNode; } } @@ -1302,18 +1392,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { - log.trace("[{}] Received attributes update notification to device", sessionId); try { if (sparkplugSessionHandler != null) { - log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); - notification.getSharedUpdatedList().forEach(tsKvProto -> { - if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { - SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), - SparkplugMessageType.NCMD); + log.trace("[{}] Received attributes update notification to sparkplug Edge Node", sessionId); + notification.getSharedUpdatedList().forEach(tsKvProtoShared -> { + SparkplugMessageType messageType = NCMD; + TransportProtos.TsKvProto tsKvProto = tsKvProtoShared; + if ("JSON_V".equals(tsKvProtoShared.getKv().getType().name())) { + try { + messageType = SparkplugMessageType.parseMessageType(tsKvProtoShared.getKv().getKey()); + tsKvProto = getTsKvProtoFromJsonNode(JacksonUtil.toJsonNode(tsKvProtoShared.getKv().getJsonV()), tsKvProtoShared.getTs()); + } catch (ThingsboardException e) { + messageType = null; + log.error("Failed attributes update notification to sparkplug Edge Node [{}]. ", sparkplugSessionHandler.getSparkplugTopicNode().getEdgeNodeId(), e); + } + } + if (messageType != null && messageType.isSubscribe() && messageType.isNode() + && sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), messageType); sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, sparkplugTopic.toString(), sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) .ifPresent(sparkplugSessionHandler::writeAndFlush); + } else { + log.trace("Failed attributes update notification to sparkplug Edge Node [{}]. ", sparkplugSessionHandler.getSparkplugTopicNode().getEdgeNodeId()); } }); } else { @@ -1322,7 +1424,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } } catch (Exception e) { - log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e); + log.trace("[{}] Failed to convert device/Edge Node attributes update to MQTT msg", sessionId, e); } } @@ -1359,7 +1461,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { + private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos. + ToServerRpcRequestMsg rpcRequestMsg) { var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index fa06150b1a..91e7dedbf2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -315,6 +315,15 @@ public abstract class AbstractGatewaySessionHandler getDevices () { + return this.devices; + } + private TransportServiceCallback getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) { return new TransportServiceCallback() { @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index 1be6274edb..042ea129a4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt.session; import lombok.extern.slf4j.Slf4j; +import org.springframework.dao.DuplicateKeyException; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; @@ -33,9 +34,10 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProtoFromJsonNode; @Slf4j public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext { @@ -51,27 +53,49 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService); } - public Map getDeviceBirthMetrics() { + public Map getDeviceBirthMetrics() { return deviceBirthMetrics; } - public void setDeviceBirthMetrics(java.util.List metrics) { - this.deviceBirthMetrics.putAll(metrics.stream() - .collect(Collectors.toMap(SparkplugBProto.Payload.Metric::getName, metric -> metric))); + public void setDeviceBirthMetrics(java.util.List metrics) { + for (var metric : metrics) { + if (metric.hasName()) { + this.deviceBirthMetrics.put(metric.getName(), metric); + } else { + throw new IllegalArgumentException("The metric name of device: '" + this.getDeviceInfo().getDeviceName() + "' must not be empty or null! Metric: [" + metric + "]"); + } + if (metric.hasAlias() && this.parent.getNodeAlias().putIfAbsent(metric.getAlias(), metric.getName()) != null) { + throw new DuplicateKeyException("The alias '" + metric.getAlias() + "' already exists in device: '" + this.getDeviceInfo().getDeviceName() + "'"); + } + } } @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); - notification.getSharedUpdatedList().forEach(tsKvProto -> { - if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { + notification.getSharedUpdatedList().forEach(tsKvProtoShared -> { + SparkplugMessageType messageType = DCMD; + TransportProtos.TsKvProto tsKvProto = tsKvProtoShared; + if ("JSON_V".equals(tsKvProtoShared.getKv().getType().name())) { + try { + messageType = SparkplugMessageType.parseMessageType(tsKvProtoShared.getKv().getKey()); + tsKvProto = getTsKvProtoFromJsonNode(JacksonUtil.toJsonNode(tsKvProtoShared.getKv().getJsonV()), tsKvProtoShared.getTs()); + } catch (ThingsboardException e) { + messageType = null; + log.error("Failed attributes update notification to sparkplug device [{}]. ", deviceInfo.getDeviceName(), e); + } + } + if (messageType != null && messageType.isSubscribe() && messageType.isDevice() + && getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(), - SparkplugMessageType.DCMD, deviceInfo.getDeviceName()); + messageType, deviceInfo.getDeviceName()); parent.createSparkplugMqttPublishMsg(tsKvProto, - sparkplugTopic.toString(), - getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) + sparkplugTopic.toString(), + getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) .ifPresent(this.parent::writeAndFlush); + } else { + log.trace("Failed attributes update notification to sparkplug device [{}]. ", deviceInfo.getDeviceName()); } }); } @@ -81,20 +105,22 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId); try { SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); - SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); - header.setMessageType(messageType.name()); - TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); - if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { - SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(), - messageType, deviceInfo.getDeviceName()); - parent.createSparkplugMqttPublishMsg(tsKvProto, - sparkplugTopic.toString(), - getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) - .ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo)); - } else { - parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), - ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " + - rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]"); + if (messageType.isSubscribe()) { + SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); + header.setMessageType(messageType.name()); + TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); + if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(), + messageType, deviceInfo.getDeviceName()); + parent.createSparkplugMqttPublishMsg(tsKvProto, + sparkplugTopic.toString(), + getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) + .ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo)); + } else { + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " + + rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]"); + } } } catch (ThingsboardException e) { parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 217a1713ca..cdb562c063 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -18,19 +18,15 @@ package org.thingsboard.server.transport.mqtt.session; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; -import com.google.protobuf.Descriptors; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.eclipse.leshan.core.ResponseCode; +import org.springframework.dao.DuplicateKeyException; import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.adaptor.AdaptorException; -import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; @@ -41,7 +37,9 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SpecVersion; import java.util.ArrayList; import java.util.List; @@ -50,26 +48,32 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.parseMessageType; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.SPARKPLUG_SEQUENCE_NUMBER_KEY; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_STATE_REGEXP; /** * Created by nickAS21 on 12.12.22 */ @Slf4j +@SpecVersion(spec = "sparkplug", version = "3.0.0") public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { @Getter private final SparkplugTopic sparkplugTopicNode; @Getter private final Map nodeBirthMetrics; + @Getter + private final Map nodeAlias; private final MqttTransportHandler parent; public SparkplugNodeSessionHandler(MqttTransportHandler parent, DeviceSessionCtx deviceSessionCtx, UUID sessionId, @@ -78,25 +82,29 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler(); + this.nodeAlias = new ConcurrentHashMap<>(); } - public void setNodeBirthMetrics(java.util.List metrics) { - this.nodeBirthMetrics.putAll(metrics.stream() - .collect(Collectors.toMap(SparkplugBProto.Payload.Metric::getName, metric -> metric))); - } - - public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx; - byte[] bytes = getBytes(inbound.payload()); - Descriptors.Descriptor telemetryDynamicMsgDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor()); - try { - return JsonConverter.convertToTelemetryProto(JsonParser.parseString(ProtoConverter.dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor))); - } catch (Exception e) { - log.debug("Failed to decode post telemetry request", e); - throw new AdaptorException(e); + public void setNodeBirthMetrics(java.util.List metrics) throws AdaptorException { + for (var metric : metrics) { + if (metric.hasName()) { + this.nodeBirthMetrics.put(metric.getName(), metric); + } else { + throw new AdaptorException("The metric name of edgeNode: '" + this.sparkplugTopicNode.getEdgeNodeId() + "' must not be empty or null! Metric: [" + metric + "]"); + } + if (metric.hasAlias() && this.nodeAlias.putIfAbsent(metric.getAlias(), metric.getName()) != null) { + throw new AdaptorException("The alias '" + metric.getAlias() + "' already exists in edgeNode: '" + this.sparkplugTopicNode.getEdgeNodeId() + "'"); + } } } + + public boolean onValidateNDEATH(SparkplugBProto.Payload sparkplugBProto) throws ThingsboardException { + return sparkplugBProto.getMetricsCount() == 1 && SPARKPLUG_BD_SEQUENCE_NUMBER_KEY.equals(sparkplugBProto.getMetrics(0).getName()) + && this.nodeBirthMetrics.get(SPARKPLUG_BD_SEQUENCE_NUMBER_KEY) != null + && sparkplugBProto.getMetrics(0).getLongValue() == this.nodeBirthMetrics.get(SPARKPLUG_BD_SEQUENCE_NUMBER_KEY).getLongValue(); + } + public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, SparkplugTopic topic) throws AdaptorException, ThingsboardException { String deviceName = topic.getNodeDeviceName(); checkDeviceName(deviceName); @@ -115,7 +123,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler grantedQoSList, MqttTopicSubscription subscription, - MqttQoS reqQoS) throws ThingsboardException { - SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicFilter()); - if (sparkplugTopic.getGroupId() == null) { - // TODO SUBSCRIBE NameSpace - } else if (sparkplugTopic.getType() == null) { - // TODO SUBSCRIBE GroupId - } else if (sparkplugTopic.isNode()) { - // SUBSCRIBE Node - parent.processAttributesRpcSubscribeSparkplugNode(grantedQoSList, reqQoS); + /** + * Subscribe: spBv1.0/STATE/my_primary_hos -> Implemented as status via checkSparkplugNodeSession + * Subscribe: CMD/DATA -> Implemented after connection: SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG/SUBSCRIBE_TO_RPC_ASYNC_MSG + * @param subscription + * @throws ThingsboardException + */ + public void handleSparkplugSubscribeMsg(MqttTopicSubscription subscription) throws ThingsboardException { + String topic = subscription.topicFilter(); + if (topic != null && topic.startsWith(TOPIC_STATE_REGEXP)) { + log.trace("Subscribing on it’s own spBv1.0/STATE/[the Sparkplug Host Application] - Implemented as status via checkSparkplugNodeSession"); + } else if (this.validateTopicDataSubscribe(topic)) { + // TODO if need subscription DATA + log.trace("Subscribing on it’s own [" + topic + "] - Implemented as SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG/SUBSCRIBE_TO_RPC_ASYNC_MSG via checkSparkplugNode/DeviceSession"); } else { - // SUBSCRIBE Device - DO NOTHING, WE HAVE ALREADY SUBSCRIBED. - // TODO: track that node subscribed to # or to particular device. + log.trace("Failed to subscribe to the topic: [" + topic + "]."); } } @@ -192,7 +206,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler onDeviceConnectProto(SparkplugTopic topic) throws ThingsboardException { + private ListenableFuture onDeviceConnectProto(SparkplugTopic topic) throws + ThingsboardException { try { String deviceType = this.gateway.getDeviceType() + " device"; return onDeviceConnect(topic.getNodeDeviceName(), deviceType); @@ -202,22 +217,45 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set attributesMetricNames, String topicTypeName) throws AdaptorException { + /** + * Sparkplug 3.0.0 -> 6.4.6. Metric + * https://sparkplug.eclipse.org/specification/version/3.0/documents/sparkplug-specification-3.0.0.pdf#%5B%7B%22num%22%3A339%2C%22gen%22%3A0%7D%2C%7B%22name%22%3A%22XYZ%22%7D%2C0%2C455.52%2Cnull%5D + * [tck-id-payloads-name-requirement] The name MUST be included with every metric unless aliases are being used. ◦ All UTF-8 characters are allowed in the metric name. However, special characters including but not limited to the following are discouraged: . , \ @ # $ % ^ & * ( ) [ ] { } | ! ` ~ : ; ' " < > ?. This is because many Sparkplug Host Applications may have issues handling them. + * • alias (are optional and not required): + * - This is an unsigned 64-bit integer representing an optional alias for a Sparkplug B payload. + * - If aliases are used, the following rules apply: + * -- [tck-id-payloads-alias-uniqueness] If supplied in an NBIRTH or BIRTH it MUST be a unique number across this Edge Node’s entire set of metrics. + * -- no two metrics for the same Edge Node can have the same alias. + * -- [tck-id-payloads-alias-birth-requirement] NBIRTH and DBIRTH messages MUST include both a metric name and alias. + * -- [tck-id-payloads-alias-data-cmd-requirement] NDATA, DDATA, NCMD, and DCMD messages MUST only include an alias and the metric name MUST be excluded. + * @param sparkplugBProto + * @param attributesMetricNames + * @param topicTypeName + * @return + * @throws AdaptorException + */ + private List convertToPostTelemetry(SparkplugBProto.Payload + sparkplugBProto, Set attributesMetricNames, String topicTypeName) throws AdaptorException { try { List msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - if (attributesMetricNames == null || !matches(attributesMetricNames, protoMetric)) { - long ts = protoMetric.getTimestamp(); - String key = "bdSeq".equals(protoMetric.getName()) ? - topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); - Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); - keyValueProtoOpt.ifPresent(kvProto -> msgs.add(postTelemetryMsgCreated(kvProto, ts))); + String metricName = protoMetric.hasName() ? protoMetric.getName() : protoMetric.hasAlias() ? this.nodeAlias.get(protoMetric.getAlias()) : null; + if (metricName == null) { + throw new ThingsboardException("Metric without metricName and alias.", ThingsboardErrorCode.INVALID_ARGUMENTS); + } else { + if (attributesMetricNames == null || !matches(attributesMetricNames, metricName)) { + long ts = protoMetric.getTimestamp(); + String key = SPARKPLUG_BD_SEQUENCE_NUMBER_KEY.equals(protoMetric.getName()) ? + topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); + keyValueProtoOpt.ifPresent(kvProto -> msgs.add(postTelemetryMsgCreated(kvProto, ts))); + } } } if (DBIRTH.name().equals(topicTypeName)) { TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder(); - keyValueProtoBuilder.setKey(topicTypeName + " " + "seq"); + keyValueProtoBuilder.setKey(topicTypeName + " " + SPARKPLUG_SEQUENCE_NUMBER_KEY); keyValueProtoBuilder.setType(TransportProtos.KeyValueType.LONG_V); keyValueProtoBuilder.setLongV(sparkplugBProto.getSeq()); msgs.add(postTelemetryMsgCreated(keyValueProtoBuilder.build(), sparkplugBProto.getTimestamp())); @@ -235,13 +273,18 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - if (matches(attributesMetricNames, protoMetric)) { - TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); - Optional msgOpt = getPostAttributeMsg(protoMetric); - if (msgOpt.isPresent()) { - deviceAttributesMsgBuilder.setDeviceName(deviceName); - deviceAttributesMsgBuilder.setMsg(msgOpt.get()); - msgs.add(deviceAttributesMsgBuilder.build()); + String metricName = protoMetric.hasName() ? protoMetric.getName() : protoMetric.hasAlias() ? this.nodeAlias.get(protoMetric.getAlias()) : null; + if (metricName == null) { + throw new ThingsboardException("Metric without metricName and alias.", ThingsboardErrorCode.INVALID_ARGUMENTS); + } else { + if (matches(attributesMetricNames, metricName)) { + TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); + Optional msgOpt = getPostAttributeMsg(protoMetric); + if (msgOpt.isPresent()) { + deviceAttributesMsgBuilder.setDeviceName(deviceName); + deviceAttributesMsgBuilder.setMsg(msgOpt.get()); + msgs.add(deviceAttributesMsgBuilder.build()); + } } } } @@ -252,8 +295,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler attributesMetricNames, SparkplugBProto.Payload.Metric protoMetric) { - String metricName = protoMetric.getName(); + private boolean matches(Set attributesMetricNames, String metricName) { for (String attributeMetricFilter : attributesMetricNames) { if (metricName.equals(attributeMetricFilter) || (attributeMetricFilter.endsWith("*") && metricName.startsWith( @@ -264,7 +306,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { + private Optional getPostAttributeMsg(SparkplugBProto.Payload.Metric + protoMetric) throws ThingsboardException { Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric); if (keyValueProtoOpt.isPresent()) { TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder(); @@ -285,7 +328,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler= 4 && splitTopic.length <= 5 && + splitTopic[0].equals(this.sparkplugTopicNode.getNamespace()) && + splitTopic[1].equals(this.sparkplugTopicNode.getGroupId()) && + splitTopic[3].equals(this.sparkplugTopicNode.getEdgeNodeId())) { + SparkplugMessageType messageType = parseMessageType(splitTopic[2]); + return messageType.isData(); + } + return false; } - } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/DeviceDescriptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/DeviceDescriptor.java new file mode 100644 index 0000000000..cf684884f3 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/DeviceDescriptor.java @@ -0,0 +1,77 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +public class DeviceDescriptor extends EdgeNodeDescriptor { + + private final String deviceId; + private final String descriptorString; + + public DeviceDescriptor(String groupId, String edgeNodeId, String deviceId) { + super(groupId, edgeNodeId); + this.deviceId = deviceId; + this.descriptorString = groupId + "/" + edgeNodeId + "/" + deviceId; + } + + public DeviceDescriptor(String descriptorString) { + super(descriptorString.substring(0, descriptorString.lastIndexOf("/"))); + this.deviceId = descriptorString.substring(descriptorString.lastIndexOf("/") + 1); + this.descriptorString = descriptorString; + } + + public DeviceDescriptor(EdgeNodeDescriptor edgeNodeDescriptor, String deviceId) { + super(edgeNodeDescriptor.getGroupId(), edgeNodeDescriptor.getEdgeNodeId()); + this.deviceId = deviceId; + this.descriptorString = edgeNodeDescriptor.getDescriptorString() + "/" + deviceId; + } + + public String getDeviceId() { + return deviceId; + } + + /** + * Returns a {@link String} representing the Device's Descriptor of the form: + * "//". + * + * @return a {@link String} representing the Device's Descriptor. + */ + @Override + public String getDescriptorString() { + return descriptorString; + } + + public String getEdgeNodeDescriptorString() { + return super.getDescriptorString(); + } + + @Override + public int hashCode() { + return this.getDescriptorString().hashCode(); + } + + @Override + public boolean equals(Object object) { + if (object instanceof DeviceDescriptor) { + return this.getDescriptorString().equals(((DeviceDescriptor) object).getDescriptorString()); + } + return this.getDescriptorString().equals(object); + } + + @Override + public String toString() { + return getDescriptorString(); + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/EdgeNodeDescriptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/EdgeNodeDescriptor.java new file mode 100644 index 0000000000..c8ea2c457e --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/EdgeNodeDescriptor.java @@ -0,0 +1,80 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.fasterxml.jackson.annotation.JsonValue; + +public class EdgeNodeDescriptor implements SparkplugDescriptor{ + + private final String groupId; + private final String edgeNodeId; + private final String descriptorString; + + public EdgeNodeDescriptor(String groupId, String edgeNodeId) { + this.groupId = groupId; + this.edgeNodeId = edgeNodeId; + this.descriptorString = groupId + "/" + edgeNodeId; + } + + /** + * Creates and EdgeNodeDescriptor from a {@link String} of the form group_name/edge_node_name + * + * @param descriptorString the {@link String} representation of an EdgeNodeDescriptor + */ + public EdgeNodeDescriptor(String descriptorString) { + String[] tokens = descriptorString.split("/"); + this.groupId = tokens[0]; + this.edgeNodeId = tokens[1]; + this.descriptorString = descriptorString; + } + + public String getGroupId() { + return groupId; + } + + public String getEdgeNodeId() { + return edgeNodeId; + } + + /** + * Returns a {@link String} representing the Edge Node's Descriptor of the form: "/". + * + * @return a {@link String} representing the Edge Node's Descriptor. + */ + @Override + public String getDescriptorString() { + return descriptorString; + } + + @Override + public int hashCode() { + return this.getDescriptorString().hashCode(); + } + + @Override + public boolean equals(Object object) { + if (object instanceof EdgeNodeDescriptor) { + return this.getDescriptorString().equals(((EdgeNodeDescriptor) object).getDescriptorString()); + } + return this.getDescriptorString().equals(object); + } + + @Override + @JsonValue + public String toString() { + return getDescriptorString(); + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java index c500d5ddd6..fd0bfb98cf 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java @@ -52,7 +52,24 @@ public enum MetricDataType { File(18, SparkplugMetricUtil.File.class), Template(19, SparkplugBProto.Payload.Template.class), - // PropertyValue Types (20 and 21) are NOT metric datatypes + // Additional PropertyValue Types (PropertyValue Types (20 and 21) are NOT metric datatypes) + PropertySet(20, SparkplugBProto.Payload.PropertySet.class), + PropertySetList(21, SparkplugBProto.Payload.PropertySetList.class), + + // Array Types + Int8Array(22, Byte[].class), + Int16Array(23, Short[].class), + Int32Array(24, Integer[].class), + Int64Array(25, Long[].class), + UInt8Array(26, Short[].class), + UInt16Array(27, Integer[].class), + UInt32Array(28, Long[].class), + UInt64Array(29, BigInteger[].class), + FloatArray(30, Float[].class), + DoubleArray(31, Double[].class), + BooleanArray(32, Boolean[].class), + StringArray(33, String[].class), + DateTimeArray(34, Date[].class), // Unknown Unknown(0, Object.class); @@ -140,6 +157,36 @@ public enum MetricDataType { return File; case 19: return Template; + case 20: + return PropertySet; + case 21: + return PropertySetList; + case 22: + return Int8Array; + case 23: + return Int16Array; + case 24: + return Int32Array; + case 25: + return Int64Array; + case 26: + return UInt8Array; + case 27: + return UInt16Array; + case 28: + return UInt32Array; + case 29: + return UInt64Array; + case 30: + return FloatArray; + case 31: + return DoubleArray; + case 32: + return BooleanArray; + case 33: + return StringArray; + case 34: + return DateTimeArray; default: return Unknown; } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugDescriptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugDescriptor.java new file mode 100644 index 0000000000..9deff97bf4 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugDescriptor.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +public interface SparkplugDescriptor { + + /** + * Returns the String representation of this {@link SparkplugDescriptor} + * + * @return the String representation of this {@link SparkplugDescriptor} + */ + public String getDescriptorString(); +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java index deb04034f2..f5b0929a46 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java @@ -91,10 +91,14 @@ public enum SparkplugMessageType { return STATE.equals(type) ? "sparkplugConnectionState" : type.name(); } + public boolean isState() { + return this.equals(STATE); + } + public boolean isDeath() { return this.equals(DDEATH) || this.equals(NDEATH); } - + public boolean isCommand() { return this.equals(DCMD) || this.equals(NCMD); } @@ -110,4 +114,19 @@ public enum SparkplugMessageType { public boolean isRecord() { return this.equals(DRECORD) || this.equals(NRECORD); } + public boolean isSubscribe() { + return isCommand() || isData() || isRecord(); + } + + public boolean isNode() { + return this.equals(NBIRTH) + || this.equals(NCMD) || this.equals(NDATA) + ||this.equals(NDEATH) || this.equals(NRECORD); + } + public boolean isDevice() { + return this.equals(DBIRTH) + || this.equals(DCMD) || this.equals(DDATA) + ||this.equals(DDEATH) || this.equals(DRECORD); + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index a6a071a740..f005235d53 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; +import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric; +import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric.Builder; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -45,6 +47,9 @@ import static org.thingsboard.common.util.JacksonUtil.newArrayNode; @Slf4j public class SparkplugMetricUtil { + public static final String SPARKPLUG_SEQUENCE_NUMBER_KEY = "seq"; + public static final String SPARKPLUG_BD_SEQUENCE_NUMBER_KEY = "bdSeq"; + public static Optional fromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { // Check if the null flag has been set indicating that the value is null if (protoMetric.getIsNull()) { @@ -141,13 +146,20 @@ public class SparkplugMetricUtil { return Optional.empty(); } } + public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType, Long alias) throws ThingsboardException { + Builder metric = Metric.newBuilder(); + metric.setTimestamp(ts) + .setDatatype(metricDataType.toIntValue()); + if (alias >= 0) { + metric.setAlias(alias); + } + if (StringUtils.isNotBlank(key)) { + metric.setName(key); + } + return addToMetricValue(value, metric.build(), metricDataType); + } - public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException { - SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder() - .setTimestamp(ts) - .setName(key) - .setDatatype(metricDataType.toIntValue()) - .build(); + public static SparkplugBProto.Payload.Metric addToMetricValue(Object value, SparkplugBProto.Payload.Metric metric, MetricDataType metricDataType) throws ThingsboardException { switch (metricDataType) { case Int8: // (byte) return metric.toBuilder().setIntValue(((Byte) value).intValue()).build(); @@ -189,6 +201,12 @@ public class SparkplugMetricUtil { return metric; } + public static TransportProtos.TsKvProto getTsKvProtoFromJsonNode(JsonNode kvProto, long ts) throws ThingsboardException { + String kvProtoKey = kvProto.fieldNames().next(); + String kvProtoValue = kvProto.get(kvProtoKey).asText(); + return getTsKvProto(kvProtoKey, kvProtoValue, ts); + } + public static TransportProtos.TsKvProto getTsKvProto(String key, Object value, long ts) throws ThingsboardException { try { TransportProtos.TsKvProto.Builder tsKvProtoBuilder = TransportProtos.TsKvProto.newBuilder(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java index 278ea52ccb..6901cf8078 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java @@ -15,65 +15,129 @@ */ package org.thingsboard.server.transport.mqtt.util.sparkplug; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; + +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.parseMessageType; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP; /** * Created by nickAS21 on 12.12.22 + * A Sparkplug MQTT Topic */ @JsonInclude(JsonInclude.Include.NON_NULL) public class SparkplugTopic { /** * The Sparkplug namespace version. - * For the Sparkplug™ B version of the specification, the UTF-8 string constant for the namespace element will be: “spBv1.0” */ - private String namespace; + private final String namespace; + + /** + * The SparkplugDesciptor for this Edge Node or Device + */ + @JsonIgnore + private final SparkplugDescriptor sparkplugDescriptor; + + /** + * The {@link EdgeNodeDescriptor} for this Edge Node or Device + */ + private final EdgeNodeDescriptor edgeNodeDescriptor; /** * The ID of the logical grouping of Edge of Network (EoN) Nodes and devices. */ - private String groupId; + private final String groupId; /** * The ID of the Edge of Network (EoN) Node. */ - private String edgeNodeId; + private final String edgeNodeId; /** * The ID of the device. */ private String deviceId; + /** + * The ID if this is a Sparkplug Host Application topic + */ + private final String hostApplicationId; + /** * The message type. */ - private SparkplugMessageType type; + private final SparkplugMessageType type; - /** - * Constructor (device). + public SparkplugTopic() { + this.namespace = null; + this.sparkplugDescriptor = null; + this.edgeNodeDescriptor = null; + this.groupId = null; + this.edgeNodeId = null; + this.deviceId = null; + this.hostApplicationId = null; + this.type = null; + } + + public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type) { + super(); + this.namespace = sparkplugTopic.namespace; + this.groupId = sparkplugTopic.groupId; + this.edgeNodeId = sparkplugTopic.edgeNodeId; + this.sparkplugDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId); + this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId); + this.deviceId = null; + this.type = type; + this.hostApplicationId = null; + } + + public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type, String deviceId) { + super(); + this.namespace = sparkplugTopic.namespace; + this.groupId = sparkplugTopic.groupId; + this.edgeNodeId = sparkplugTopic.edgeNodeId; + this.deviceId = deviceId; + this.sparkplugDescriptor = deviceId == null + ? new EdgeNodeDescriptor(groupId, edgeNodeId) + : new DeviceDescriptor(groupId, edgeNodeId, deviceId); + this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId); + this.type = type; + this.hostApplicationId = null; + } + + /** + * A Constructor for Device Topics * - * @param namespace the namespace. - * @param groupId the group ID. - * @param edgeNodeId the edge node ID. - * @param deviceId the device ID. - * @param type the message type. + * @param namespace the namespace + * @param groupId the Group ID + * @param edgeNodeId the Edge Node ID + * @param deviceId the Device ID + * @param type the message type */ public SparkplugTopic(String namespace, String groupId, String edgeNodeId, String deviceId, SparkplugMessageType type) { super(); this.namespace = namespace; + this.sparkplugDescriptor = deviceId == null + ? new EdgeNodeDescriptor(groupId, edgeNodeId) + : new DeviceDescriptor(groupId, edgeNodeId, deviceId); + this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId); this.groupId = groupId; this.edgeNodeId = edgeNodeId; this.deviceId = deviceId; + this.hostApplicationId = null; this.type = type; } /** - * Constructor (node). - * - * @param namespace the namespace. - * @param groupId the group ID. - * @param edgeNodeId the edge node ID. - * @param type the message type. + * A Constructor for Edge Node Topics + * @param namespace the namespace + * @param groupId the group ID + * @param edgeNodeId the edge node ID + * @param type the message type */ public SparkplugTopic(String namespace, String groupId, String edgeNodeId, SparkplugMessageType type) { super(); @@ -81,33 +145,110 @@ public class SparkplugTopic { this.groupId = groupId; this.edgeNodeId = edgeNodeId; this.deviceId = null; - this.type = type; - } - - public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type) { - super(); - this.namespace = sparkplugTopic.namespace; - this.groupId = sparkplugTopic.groupId; - this.edgeNodeId = sparkplugTopic.edgeNodeId; - this.deviceId = null; - this.type = type; - } - public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type, String deviceId) { - super(); - this.namespace = sparkplugTopic.namespace; - this.groupId = sparkplugTopic.groupId; - this.edgeNodeId = sparkplugTopic.edgeNodeId; - this.deviceId = deviceId; + this.sparkplugDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId); + this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId); + this.hostApplicationId = null; this.type = type; } /** - * @return the Sparkplug namespace version + * A Constructor for Device Topics + * + * @param namespace the namespace + * @param deviceDescriptor the {@link EdgeNodeDescriptor} + * @param type the message type + */ + public SparkplugTopic(String namespace, DeviceDescriptor deviceDescriptor, SparkplugMessageType type) { + this(namespace, deviceDescriptor.getGroupId(), deviceDescriptor.getEdgeNodeId(), deviceDescriptor.getDeviceId(), + type); + } + + /** + * A Constructor for Edge Node Topics + * + * @param namespace the namespace + * @param edgeNodeDescriptor the {@link EdgeNodeDescriptor} + * @param type the message type + */ + public SparkplugTopic(String namespace, EdgeNodeDescriptor edgeNodeDescriptor, SparkplugMessageType type) { + this(namespace, edgeNodeDescriptor.getGroupId(), edgeNodeDescriptor.getEdgeNodeId(), type); + } + + /** + * A Constructor for Host Application Topics + * + * @param namespace the namespace + * @param hostApplicationId the Host Application ID + */ + public SparkplugTopic(String namespace, String hostApplicationId, SparkplugMessageType type) { + super(); + this.namespace = namespace; + this.hostApplicationId = hostApplicationId; + this.type = type; + this.sparkplugDescriptor = null; + this.edgeNodeDescriptor = null; + this.groupId = null; + this.edgeNodeId = null; + this.deviceId = null; + } + + public static SparkplugTopic parseTopic(String topicString) throws ThingsboardException { + try { + if (isValidIdElementToUTF8(topicString)) { + SparkplugMessageType messageType; + String[] splitTopic = topicString.split(TOPIC_SPLIT_REGEXP); + if (TOPIC_ROOT_SPB_V_1_0.equals(splitTopic[0])) { + if (splitTopic.length == 3) { + messageType = parseMessageType(splitTopic[1]); + if (messageType.isState()) + return new SparkplugTopic(TOPIC_ROOT_SPB_V_1_0, splitTopic[2], messageType); + } else if (splitTopic.length == 4) { + messageType = parseMessageType(splitTopic[2]); + if (messageType.isNode()) + return new SparkplugTopic(TOPIC_ROOT_SPB_V_1_0, splitTopic[1], splitTopic[3], messageType); + } else if (splitTopic.length == 5) { + messageType = parseMessageType(splitTopic[2]); + if (messageType.isDevice()) + return new SparkplugTopic(TOPIC_ROOT_SPB_V_1_0, splitTopic[1], splitTopic[3], splitTopic[4], messageType); + + } + } + } + throw new ThingsboardException("Invalid Sparkplug topic from String: " + topicString, ThingsboardErrorCode.INVALID_ARGUMENTS); + } catch ( + Exception e) { + throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS); + } + + } + + /** + * Returns the Sparkplug namespace version. + * + * @return the namespace */ public String getNamespace() { return namespace; } + /** + * Returns the {@link SparkplugDescriptor} + * + * @return the SparkplugDescriptor + */ + public SparkplugDescriptor getSparkplugDescriptor() { + return sparkplugDescriptor; + } + + /** + * Returns the {@link EdgeNodeDescriptor} + * + * @return the EdgeNodeDescriptor + */ + public EdgeNodeDescriptor getEdgeNodeDescriptor() { + return edgeNodeDescriptor; + } + /** * Returns the ID of the logical grouping of Edge of Network (EoN) Nodes and devices. * @@ -118,20 +259,39 @@ public class SparkplugTopic { } /** - * @return the ID of the Edge of Network (EoN) Node + * Returns the ID of the Edge of Network (EoN) Node. + * + * @return the edge node ID */ public String getEdgeNodeId() { return edgeNodeId; } /** + * Returns the ID of the device. + * * @return the device ID */ public String getDeviceId() { return deviceId; } + public void updateDeviceIdPlus(String deviceIdNew) { + this.deviceId = this.deviceId.equals("+") ? deviceIdNew : this.deviceId; + } + /** + * Returns the Host Application ID if this is a Host topic + * + * @return the Host Application ID + */ + public String getHostApplicationId() { + return hostApplicationId; + } + + /** + * Returns the message type. + * * @return the message type */ public SparkplugMessageType getType() { @@ -140,12 +300,15 @@ public class SparkplugTopic { @Override public String toString() { - StringBuilder sb = new StringBuilder(getNamespace()).append("/") - .append(getGroupId()).append("/") - .append(getType()).append("/") - .append(getEdgeNodeId()); - if (getDeviceId() != null) { - sb.append("/").append(getDeviceId()); + StringBuilder sb = new StringBuilder(); + if (hostApplicationId == null) { + sb.append(getNamespace()).append("/").append(getGroupId()).append("/").append(getType()).append("/") + .append(getEdgeNodeId()); + if (getDeviceId() != null) { + sb.append("/").append(getDeviceId()); + } + } else { + sb.append(getNamespace()).append("/").append(getType()).append("/").append(hostApplicationId); } return sb.toString(); } @@ -165,5 +328,13 @@ public class SparkplugTopic { public String getNodeDeviceName() { return isNode() ? edgeNodeId : deviceId; } + + public static boolean isValidIdElementToUTF8(String deviceIdElement) { + if (deviceIdElement == null) { + return false; + } + String regex = "^(?!.*//)[^+#]*$"; + return deviceIdElement.matches(regex); + } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java new file mode 100644 index 0000000000..a6e2e34553 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicService.java @@ -0,0 +1,67 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; + +import java.util.HashMap; +import java.util.Map; + +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic.parseTopic; + +@Slf4j +@Service +@TbMqttTransportComponent +public class SparkplugTopicService { + + private static final Map SPLIT_TOPIC_CACHE = new HashMap<>(); + public static final String TOPIC_ROOT_SPB_V_1_0 = "spBv1.0"; + public static final String TOPIC_ROOT_CERT_SP = "$sparkplug/certificates/"; + public static final String TOPIC_SPLIT_REGEXP = "/"; + public static final String TOPIC_STATE_REGEXP = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + STATE.name() + TOPIC_SPLIT_REGEXP; + + public static SparkplugTopic getSplitTopic(String topic) throws ThingsboardException { + SparkplugTopic sparkplugTopic = SPLIT_TOPIC_CACHE.get(topic); + if (sparkplugTopic == null) { + // validation topic + sparkplugTopic = parseTopic(topic); + SPLIT_TOPIC_CACHE.put(topic, sparkplugTopic); + } + return sparkplugTopic; + } + + /** + * all ID Element MUST be a UTF-8 string + * and with the exception of the reserved characters of + (plus), / (forward slash). + * Publish: $sparkplug/certificates/spBv1.0/G1/NBIRTH/E1 + * Publish: spBv1.0/G1/NBIRTH/E1 + * Publish: $sparkplug/certificates/spBv1.0/G1/DBIRTH/E1/D1 + * Publish: spBv1.0/G1/DBIRTH/E1/D1 + * @param topic + * @return + * @throws ThingsboardException + */ + public static SparkplugTopic parseTopicPublish(String topic) throws ThingsboardException { + topic = topic.startsWith(TOPIC_ROOT_CERT_SP) ? topic.substring(TOPIC_ROOT_CERT_SP.length()) : topic; + topic = topic.indexOf("+") > 0 ? topic.substring(0, topic.indexOf("+")): topic; + return getSplitTopic(topic); + } +} + diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java deleted file mode 100644 index aaaedcda60..0000000000 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopicUtil.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.util.sparkplug; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; -import org.thingsboard.server.common.data.exception.ThingsboardException; - -import java.util.HashMap; -import java.util.Map; - -/** - * Provides utility methods for handling Sparkplug MQTT message topics. - */ -public class SparkplugTopicUtil { - - private static final Map SPLIT_TOPIC_CACHE = new HashMap(); - private static final String TOPIC_INVALID_NUMBER = "Invalid number of topic elements: "; - public static final String NAMESPACE = "spBv1.0"; - - public static String[] getSplitTopic(String topic) { - String[] splitTopic = SPLIT_TOPIC_CACHE.get(topic); - if (splitTopic == null) { - splitTopic = topic.split("/"); - SPLIT_TOPIC_CACHE.put(topic, splitTopic); - } - - return splitTopic; - } - - /** - * Serializes a {@link SparkplugTopic} instance in to a JSON string. - * - * @param topic a {@link SparkplugTopic} instance - * @return a JSON string - * @throws JsonProcessingException - */ - public static String sparkplugTopicToString(SparkplugTopic topic) throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.writeValueAsString(topic); - } - - /** - * Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance. - * - * @param topic a topic string - * @return a {@link SparkplugTopic} instance - * @throws ThingsboardException if an error occurs while parsing - */ - public static SparkplugTopic parseTopicSubscribe(String topic) throws ThingsboardException { - // TODO "+", "$" - topic = topic.indexOf("#") > 0 ? topic.substring(0, topic.indexOf("#")) : topic; - return parseTopic(SparkplugTopicUtil.getSplitTopic(topic)); - } - - public static SparkplugTopic parseTopicPublish(String topic) throws ThingsboardException { - if (topic.contains("#") || topic.contains("$") || topic.contains("+")) { - throw new ThingsboardException("Invalid of topic elements for Publish", ThingsboardErrorCode.INVALID_ARGUMENTS); - } else { - String[] splitTopic = SparkplugTopicUtil.getSplitTopic(topic); - if (splitTopic.length < 4 || splitTopic.length > 5) { - throw new ThingsboardException(TOPIC_INVALID_NUMBER + splitTopic.length, ThingsboardErrorCode.INVALID_ARGUMENTS); - } - return parseTopic(splitTopic); - } - } - - /** - * Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance. - * - * @param splitTopic a topic split into tokens - * @return a {@link SparkplugTopic} instance - * @throws Exception if an error occurs while parsing - */ - @SuppressWarnings("incomplete-switch") - public static SparkplugTopic parseTopic(String[] splitTopic) throws ThingsboardException { - int length = splitTopic.length; - if (length == 0) { - throw new ThingsboardException(TOPIC_INVALID_NUMBER + length, ThingsboardErrorCode.INVALID_ARGUMENTS); - } else { - SparkplugMessageType type; - String namespace, edgeNodeId, groupId, deviceId; - namespace = validateNameSpace(splitTopic[0]); - groupId = length > 1 ? splitTopic[1] : null; - type = length > 2 ? SparkplugMessageType.parseMessageType(splitTopic[2]) : null; - edgeNodeId = length > 3 ? splitTopic[3] : null; - deviceId = length > 4 ? splitTopic[4] : null; - return new SparkplugTopic(namespace, groupId, edgeNodeId, deviceId, type); - } - } - - /** - * For the Sparkplug™ B version of the specification, the UTF-8 string constant for the namespace element will be: "spBv1.0" - * @param nameSpace - * @return - */ - private static String validateNameSpace(String nameSpace) throws ThingsboardException { - if (NAMESPACE.equals(nameSpace)) return nameSpace; - throw new ThingsboardException("The namespace [" + nameSpace + "] is not valid and must be [" + NAMESPACE + "] for the Sparkplug™ B version.", ThingsboardErrorCode.INVALID_ARGUMENTS); - } - -} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SpecVersion.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SpecVersion.java new file mode 100644 index 0000000000..07f5cbf53f --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SpecVersion.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +@Documented +@Target(ElementType.TYPE) +public @interface SpecVersion { + String spec() default ""; + String version(); +} \ No newline at end of file diff --git a/common/transport/mqtt/src/main/proto/sparkplug.proto b/common/transport/mqtt/src/main/proto/sparkplug.proto index 8b5bc6fd22..c28a4e6410 100644 --- a/common/transport/mqtt/src/main/proto/sparkplug.proto +++ b/common/transport/mqtt/src/main/proto/sparkplug.proto @@ -21,51 +21,71 @@ import "google/protobuf/any.proto"; option java_package = "org.thingsboard.server.gen.transport.mqtt"; option java_outer_classname = "SparkplugBProto"; -message Payload { - /* - // Indexes of Data Types - // Unknown placeholder for future expansion. - Unknown = 0; - // Basic Types - Int8 = 1; - Int16 = 2; - Int32 = 3; - Int64 = 4; - UInt8 = 5; - UInt16 = 6; - UInt32 = 7; - UInt64 = 8; - Float = 9; - Double = 10; - Boolean = 11; - String = 12; - DateTime = 13; - Text = 14; - // Additional Metric Types - UUID = 15; - DataSet = 16; - Bytes = 17; - File = 18; - Template = 19; - // Additional PropertyValue Types - PropertySet = 20; - PropertySetList = 21; - */ +enum DataType { + // Indexes of Data Types + + // Unknown placeholder for future expansion. + Unknown = 0; + + // Basic Types + Int8 = 1; + Int16 = 2; + Int32 = 3; + Int64 = 4; + UInt8 = 5; + UInt16 = 6; + UInt32 = 7; + UInt64 = 8; + Float = 9; + Double = 10; + Boolean = 11; + String = 12; + DateTime = 13; + Text = 14; + + // Additional Metric Types + UUID = 15; + DataSet = 16; + Bytes = 17; + File = 18; + Template = 19; + + // Additional PropertyValue Types + PropertySet = 20; + PropertySetList = 21; + + // Array Types + Int8Array = 22; + Int16Array = 23; + Int32Array = 24; + Int64Array = 25; + UInt8Array = 26; + UInt16Array = 27; + UInt32Array = 28; + UInt64Array = 29; + FloatArray = 30; + DoubleArray = 31; + BooleanArray = 32; + StringArray = 33; + DateTimeArray = 34; +} + +message Payload { message Template { message Parameter { - optional string name = 1; - optional uint32 type = 2; + optional string name = 1; + optional uint32 type = 2; oneof value { - uint32 int_value = 3; - uint64 long_value = 4; - float float_value = 5; - double double_value = 6; - bool boolean_value = 7; - string string_value = 8; + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; ParameterValueExtension extension_value = 9; } @@ -74,12 +94,12 @@ message Payload { } } - optional string version = 1; // The version of the Template to prevent mismatches - repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value - repeated Parameter parameters = 3; - optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance - optional bool is_definition = 5; - google.protobuf.Any extensions = 6; + optional string version = 1; // The version of the Template to prevent mismatches + repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value + repeated Parameter parameters = 3; + optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance + optional bool is_definition = 5; + google.protobuf.Any extensions = 6; } message DataSet { @@ -87,118 +107,118 @@ message Payload { message DataSetValue { oneof value { - uint32 int_value = 1; - uint64 long_value = 2; - float float_value = 3; - double double_value = 4; - bool boolean_value = 5; - string string_value = 6; - DataSetValueExtension extension_value = 7; + uint32 int_value = 1; + uint64 long_value = 2; + float float_value = 3; + double double_value = 4; + bool boolean_value = 5; + string string_value = 6; + DataSetValueExtension extension_value = 7; } message DataSetValueExtension { - google.protobuf.Any extensions = 1; + google.protobuf.Any extensions = 1; } } message Row { - repeated DataSetValue elements = 1; - google.protobuf.Any extensions = 2; // For third party extensions + repeated DataSetValue elements = 1; + google.protobuf.Any extensions = 2; // For third party extensions } - optional uint64 num_of_columns = 1; - repeated string columns = 2; - repeated uint32 types = 3; - repeated Row rows = 4; - google.protobuf.Any extensions = 5; // For third party extensions + optional uint64 num_of_columns = 1; + repeated string columns = 2; + repeated uint32 types = 3; + repeated Row rows = 4; + google.protobuf.Any extensions = 5; // For third party extensions } message PropertyValue { - optional uint32 type = 1; - optional bool is_null = 2; + optional uint32 type = 1; + optional bool is_null = 2; oneof value { - uint32 int_value = 3; - uint64 long_value = 4; - float float_value = 5; - double double_value = 6; - bool boolean_value = 7; - string string_value = 8; - PropertySet propertyset_value = 9; - PropertySetList propertysets_value = 10; // List of Property Values - PropertyValueExtension extension_value = 11; + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + PropertySet propertyset_value = 9; + PropertySetList propertysets_value = 10; // List of Property Values + PropertyValueExtension extension_value = 11; } message PropertyValueExtension { - google.protobuf.Any extensions = 1; + google.protobuf.Any extensions = 1; } } message PropertySet { - repeated string keys = 1; // Names of the properties - repeated PropertyValue values = 2; - google.protobuf.Any extensions = 3; + repeated string keys = 1; // Names of the properties + repeated PropertyValue values = 2; + google.protobuf.Any extensions = 3; } message PropertySetList { repeated PropertySet propertyset = 1; - google.protobuf.Any extensions = 2; + google.protobuf.Any extensions = 2; } message MetaData { // Bytes specific metadata - optional bool is_multi_part = 1; + optional bool is_multi_part = 1; // General metadata - optional string content_type = 2; // Content/Media type - optional uint64 size = 3; // File size, String size, Multi-part size, etc - optional uint64 seq = 4; // Sequence number for multi-part messages + optional string content_type = 2; // Content/Media type + optional uint64 size = 3; // File size, String size, Multi-part size, etc + optional uint64 seq = 4; // Sequence number for multi-part messages // File metadata - optional string file_name = 5; // File name - optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) - optional string md5 = 7; // md5 of data + optional string file_name = 5; // File name + optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) + optional string md5 = 7; // md5 of data // Catchalls and future expansion - optional string description = 8; // Could be anything such as json or xml of custom properties - google.protobuf.Any extensions = 9; + optional string description = 8; // Could be anything such as json or xml of custom properties + google.protobuf.Any extensions = 9; } message Metric { - optional string name = 1; // Metric name - should only be included on birth - optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages - optional uint64 timestamp = 3; // Timestamp associated with data acquisition time - optional uint32 datatype = 4; // DataType of the metric/tag value - optional bool is_historical = 5; // If this is historical data and should not update real time tag - optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag - optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. - optional MetaData metadata = 8; // Metadata for the payload + optional string name = 1; // Metric name - should only be included on birth + optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages + optional uint64 timestamp = 3; // Timestamp associated with data acquisition time + optional uint32 datatype = 4; // DataType of the metric/tag value + optional bool is_historical = 5; // If this is historical data and should not update real time tag + optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag + optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. + optional MetaData metadata = 8; // Metadata for the payload optional PropertySet properties = 9; oneof value { - uint32 int_value = 10; - uint64 long_value = 11; - float float_value = 12; - double double_value = 13; - bool boolean_value = 14; - string string_value = 15; - bytes bytes_value = 16; // Bytes, File - DataSet dataset_value = 17; - Template template_value = 18; - MetricValueExtension extension_value = 19; + uint32 int_value = 10; + uint64 long_value = 11; + float float_value = 12; + double double_value = 13; + bool boolean_value = 14; + string string_value = 15; + bytes bytes_value = 16; // Bytes, File + DataSet dataset_value = 17; + Template template_value = 18; + MetricValueExtension extension_value = 19; } message MetricValueExtension { - google.protobuf.Any extensions = 1; + google.protobuf.Any extensions = 1; } } - optional uint64 timestamp = 1; // Timestamp at message sending time - repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs - optional uint64 seq = 3; // Sequence number - optional string uuid = 4; // UUID to track message type in terms of schema definitions - optional bytes body = 5; // To optionally bypass the whole definition above - google.protobuf.Any extensions = 6; + optional uint64 timestamp = 1; // Timestamp at message sending time + repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs + optional uint64 seq = 3; // Sequence number + optional string uuid = 4; // UUID to track message type in terms of schema definitions + optional bytes body = 5; // To optionally bypass the whole definition above + google.protobuf.Any extensions = 6; } \ No newline at end of file