Merge pull request #12502 from thingsboard/sparkplug_3_0

sparkplug: ver 3.0
This commit is contained in:
Andrew Shvayka 2025-06-03 15:18:22 +03:00 committed by GitHub
commit 02672b37ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1230 additions and 430 deletions

View File

@ -29,6 +29,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@ -67,7 +68,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt64;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt8;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
/**
* Created by nickAS21 on 12.01.23
@ -112,31 +113,44 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
long value = bdSeq = 0;
clientWithCorrectNodeAccessTokenWithNDEATH(ts, value);
}
public void clientWithCorrectNodeAccessTokenWithNDEATH(Long alias) throws Exception {
long ts = calendar.getTimeInMillis();
long value = bdSeq = 0;
clientMqttV5ConnectWithNDEATH(ts, value,alias);
}
public void clientWithCorrectNodeAccessTokenWithNDEATH(long ts, long value) throws Exception {
IMqttToken connectionResult = clientConnectWithNDEATH(ts, value);
IMqttToken connectionResult = clientMqttV5ConnectWithNDEATH(ts, value, -1L);
MqttWireMessage response = connectionResult.getResponse();
Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType());
MqttConnAck connAckMsg = (MqttConnAck) response;
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode());
}
public IMqttToken clientConnectWithNDEATH(long ts, long value, String... nameSpaceBad) throws Exception {
String key = keysBdSeq;
public IMqttToken clientMqttV5ConnectWithNDEATH(long ts, long value, Long alias, String... nameSpaceBad) throws Exception {
return clientMqttV5ConnectWithNDEATH(ts, value, null, alias, nameSpaceBad);
}
public IMqttToken clientMqttV5ConnectWithNDEATH(long ts, long value, String metricName, Long alias, String... nameSpaceBad) throws Exception {
String key = metricName == null ? keysBdSeq : metricName;
MetricDataType metricDataType = Int64;
SparkplugBProto.Payload.Builder deathPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
deathPayload.addMetrics(createMetric(value, ts, key, metricDataType));
deathPayload.addMetrics(createMetric(value, ts, key, metricDataType, alias));
byte[] deathBytes = deathPayload.build().toByteArray();
this.client = new MqttV5TestClient();
this.mqttCallback = new SparkplugMqttCallback();
this.client.setCallback(this.mqttCallback);
MqttConnectionOptions options = new MqttConnectionOptions();
// If the MQTT client is using MQTT v5.0, the Edge Nodes MQTT CONNECT packet MUST set the Clean Start flag to true and the Session Expiry Interval to 0
options.setCleanStart(true);
options.setSessionExpiryInterval(0L);
options.setUserName(gatewayAccessToken);
String nameSpace = nameSpaceBad.length == 0 ? NAMESPACE : nameSpaceBad[0];
String nameSpace = nameSpaceBad.length == 0 ? TOPIC_ROOT_SPB_V_1_0 : nameSpaceBad[0];
String topic = nameSpace + "/" + groupId + "/" + SparkplugMessageType.NDEATH.name() + "/" + edgeNode;
// The NDEATH message MUST set the MQTT Will QoS to 1 and Retained flag to false
MqttMessage msg = new MqttMessage();
msg.setId(0);
msg.setQos(1);
msg.setPayload(deathBytes);
options.setWill(topic, msg);
return client.connect(options);
@ -148,19 +162,19 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
MetricDataType metricDataType = Int32;
String key = "Node Metric int32";
int valueDeviceInt32 = 1024;
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType, -1L);
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getBdSeqNum());
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
valueDeviceInt32 = 4024;
metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32);
metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32, -1L);
for (int i = 0; i < cntDevices; i++) {
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
@ -169,7 +183,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
payloadBirthDevice.addMetrics(metric);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
payloadBirthDevice.build().toByteArray(), 0, false);
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
@ -187,6 +201,49 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return devices;
}
protected List<Device> connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices(long ts) throws Exception {
List<Device> devices = new ArrayList<>();
Long alias = 0L;
clientWithCorrectNodeAccessTokenWithNDEATH(alias++);
MetricDataType metricDataType = Int32;
String key = "Node Metric int32";
int valueDeviceInt32 = 1024;
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType, alias++);
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getBdSeqNum());
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
valueDeviceInt32 = 4024;
metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32, alias++);
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getSeqNum());
String deviceName = deviceId + "_" + 1;
payloadBirthDevice.addMetrics(metric);
if (client.isConnected()) {
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
payloadBirthDevice.build().toByteArray(), 0, false);
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;
});
devices.add(device.get());
}
Assert.assertEquals(1, devices.size());
return devices;
}
protected long getBdSeqNum() throws Exception {
if (bdSeq == 256) {
bdSeq = 0;
@ -207,16 +264,20 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
.setTimestamp(calendar.getTimeInMillis());
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64));
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64, -1L));
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq);
payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean));
payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean, -1L));
listKeys.add(keyNodeRebirth);
payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType));
if (StringUtils.isNotBlank(metricKey)) {
payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType, -1L));
} else {
payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType, 4L));
}
listKeys.add(metricKey);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
return listKeys;
@ -297,7 +358,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
private TsKvEntry createdAddMetricTsKvLong(SparkplugBProto.Payload.Builder dataPayload, String key, Object value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, Long.valueOf(String.valueOf(value))));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L));
return tsKvEntry;
}
@ -305,7 +366,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
long ts, MetricDataType metricDataType) throws ThingsboardException {
Double dd = Double.parseDouble(Float.toString(value));
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new DoubleDataEntry(key, dd));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L));
return tsKvEntry;
}
@ -313,21 +374,21 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
long ts, MetricDataType metricDataType) throws ThingsboardException {
Long l = Double.valueOf(value).longValue();
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new LongDataEntry(key, l));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L));
return tsKvEntry;
}
private TsKvEntry createdAddMetricTsKvBoolean(SparkplugBProto.Payload.Builder dataPayload, String key, boolean value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new BooleanDataEntry(key, value));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L));
return tsKvEntry;
}
private TsKvEntry createdAddMetricTsKvString(SparkplugBProto.Payload.Builder dataPayload, String key, String value,
long ts, MetricDataType metricDataType) throws ThingsboardException {
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(key, value));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType));
dataPayload.addMetrics(createMetric(value, ts, key, metricDataType, -1L));
return tsKvEntry;
}
@ -348,7 +409,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
if (nodeArray.size() > 0) {
Optional<TsKvEntry> tsKvEntryOptional = Optional.of(new BasicTsKvEntry(ts, new JsonDataEntry(key, nodeArray.toString())));
if (tsKvEntryOptional.isPresent()) {
dataPayload.addMetrics(createMetric(values, ts, key, metricDataType));
dataPayload.addMetrics(createMetric(values, ts, key, metricDataType, -1L));
listTsKvEntry.add(tsKvEntryOptional.get());
listKeys.add(key);
}
@ -416,7 +477,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return java.util.UUID.randomUUID().toString();
}
public class SparkplugMqttCallback implements MqttCallback {
public class SparkplugMqttCallback implements MqttCallback {
private final List<SparkplugBProto.Payload.Metric> messageArrivedMetrics = new ArrayList<>();
@Override

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.transport.mqtt.sparkplug.attributes;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.thingsboard.server.common.data.Device;
@ -33,8 +32,10 @@ import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DDATA;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDATA;
/**
* Created by nickAS21 on 12.01.23
@ -50,7 +51,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Assert.assertTrue(listKeys.contains(keyNodeRebirth));
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}";
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + SparkplugMessageType.NBIRTH.name())
@ -76,7 +76,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Object metricValue = nextBoolean();
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
// Boolean <-> String
@ -140,7 +139,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Object metricValue = nextUInt32();
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
// Long <-> String
@ -192,7 +190,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Object metricValue = nextFloat(30, 400);
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
// Float <-> String
@ -237,6 +234,67 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Assert.assertTrue(expectedValue == mqttCallback.getMessageArrivedMetrics().get(0).getFloatValue());
}
protected void processClientWithCorrectAccessTokenPublishMetricDataTypeFromJson_SendValueOk() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
awaitForDeviceActorToReceiveSubscription(devices.get(0).getId(), FeatureType.ATTRIBUTES, 1);
// Node Edge
SparkplugMessageType messageType = NCMD;
String keyBirthNameNode = "Node Metric int32";
int valueBirthNameNode = 1024;
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameNode + "\":" + valueBirthNameNode + "}}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(keyBirthNameNode, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(valueBirthNameNode == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
mqttCallback.deleteMessageArrivedMetrics(0);
messageType = NDATA;
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameNode + "\":" + valueBirthNameNode + "}}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(keyBirthNameNode, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(valueBirthNameNode == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
mqttCallback.deleteMessageArrivedMetrics(0);
// Device
messageType = DCMD;
String keyBirthNameDevice = metricBirthName_Int32;
int valueBirthNameDevice = 123456;
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameDevice + "\":" + valueBirthNameDevice + "}}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + DCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(keyBirthNameDevice, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(valueBirthNameDevice == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
mqttCallback.deleteMessageArrivedMetrics(0);
messageType = DDATA;
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + messageType.name() + "\": {\"" + keyBirthNameDevice + "\":" + valueBirthNameDevice + "}}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + DCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(keyBirthNameDevice, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(valueBirthNameDevice == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
mqttCallback.deleteMessageArrivedMetrics(0);
}
protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
MetricDataType metricDataType = MetricDataType.Double;
@ -244,7 +302,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Object metricValue = nextDouble();
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
// Double <-> String
@ -296,7 +353,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Object metricValue = nextString();
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.ATTRIBUTES, 1);
// String <-> Long
@ -356,7 +412,6 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}

View File

@ -57,6 +57,10 @@ public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSp
public void testClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk();
}
@Test
public void testClientWithCorrectAccessTokenPublishMetricDataTypeFromJson_SendValueOk() throws Exception {
processClientWithCorrectAccessTokenPublishMetricDataTypeFromJson_SendValueOk();
}
@Test
public void testClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {

View File

@ -40,7 +40,7 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConn
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
/**
* Created by nickAS21 on 12.01.23
@ -80,7 +80,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
protected void processClientWithCorrectNodeAccessTokenNameSpaceInvalid_Test() throws Exception {
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long value = bdSeq = 0;
MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientConnectWithNDEATH(ts, value, "spBv1.2"));
MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientMqttV5ConnectWithNDEATH(ts, value, -1L,"spBv1.2"));
String expectedMessage = "Server unavailable.";
int expectedReasonCode = 136;
Assert.assertEquals(expectedMessage, actualException.getMessage());
@ -135,7 +135,7 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
if (client.isConnected()) {
List<Device> devicesList = new ArrayList<>(devices);
Device device = devicesList.get(indexDeviceDisconnect);
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(),
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(),
payloadDeathDevice.build().toByteArray(), 0, false);
await(alias + messageName(STATE) + ", device: " + device.getName())
.atMost(40, TimeUnit.SECONDS)

View File

@ -15,8 +15,8 @@
*/
package org.thingsboard.server.transport.mqtt.sparkplug.rpc;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
@ -32,7 +32,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE;
@Slf4j
public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest {
@ -45,7 +44,6 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.RPC, 1);
String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}";
String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway);
@ -63,9 +61,10 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie
public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
awaitForDeviceActorToReceiveSubscription(devices.get(0).getId(), FeatureType.RPC, 1);
String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}";
String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0));
await(alias + NCMD.name())
await(alias + DCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
@ -75,12 +74,41 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}
@Test
public void processClientDeviceWithCorrectAccessTokenPublishWithAlias_TwoWayRpc_Success() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices(ts);
awaitForDeviceActorToReceiveSubscription(devices.get(0).getId(), FeatureType.RPC, 1);
String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}";
String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0));
await(alias + DCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(expected, actual);
Assert.assertFalse(mqttCallback.getMessageArrivedMetrics().get(0).hasName());
Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).hasAlias());
Assert.assertTrue(2L == mqttCallback.getMessageArrivedMetrics().get(0).getAlias());
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}
@Test
public void processClientNodeWithCorrectAccessTokenPublishWithAliasWithoutMetricName_TwoWayRpc_BAD_REQUEST_PARAMS() throws Exception {
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long value = bdSeq = 0;
MqttException actualException = Assert.assertThrows(MqttException.class, () -> clientMqttV5ConnectWithNDEATH(ts, value, "",4L));
String expectedMessage = "Server unavailable.";
int expectedReasonCode = 136;
Assert.assertEquals(expectedMessage, actualException.getMessage());
Assert.assertEquals(expectedReasonCode, actualException.getReasonCode());
}
@Test
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.RPC, 1);
String invalidateTypeMessageName = "RCMD";
String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " +
@ -94,7 +122,6 @@ public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5Clie
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
awaitForDeviceActorToReceiveSubscription(savedGateway.getId(), FeatureType.RPC, 1);
String metricNameBad = metricBirthName_Int32 + "_Bad";
String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}";

View File

@ -47,6 +47,15 @@ public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest {
public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success();
}
@Test
public void testClientDeviceWithCorrectAccessTokenPublishWithAlias_TwoWayRpc_Success() throws Exception {
processClientDeviceWithCorrectAccessTokenPublishWithAlias_TwoWayRpc_Success();
}
@Test
public void testClientNodeWithCorrectAccessTokenPublishWithAliasWithoutMetricName_TwoWayRpc_BAD_REQUEST_PARAMS() throws Exception {
processClientNodeWithCorrectAccessTokenPublishWithAliasWithoutMetricName_TwoWayRpc_BAD_REQUEST_PARAMS();
}
@Test
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception {

View File

@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
/**
* Created by nickAS21 on 12.01.23
@ -67,7 +67,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
createdAddMetricValuePrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
ndataPayload.build().toByteArray(), 0, false);
}
@ -96,7 +96,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
createdAddMetricValueArraysPrimitiveTsKv(listTsKvEntry, listKeys, ndataPayload, ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
client.publish(TOPIC_ROOT_SPB_V_1_0 + "/" + groupId + "/" + messageTypeName + "/" + edgeNode,
ndataPayload.build().toByteArray(), 0, false);
}

View File

@ -28,6 +28,8 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
@ -84,6 +86,7 @@ import org.thingsboard.server.transport.mqtt.limits.SessionLimits;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugDeviceSessionContext;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
@ -101,6 +104,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
@ -120,9 +124,14 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProtoFromJsonNode;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic.parseTopic;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.parseTopicPublish;
/**
* @author Andrew Shvayka
@ -444,11 +453,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
/**
* It may be the case that an Edge Node has many dynamic associated devices.
* Publish: spBv1.0/G1/DBIRTH/E1/+
* Publish: spBv1.0/G1/DDATA/E1/+
* Publish: spBv1.0/G1/DCMD/E1/+
* Publish: spBv1.0/G1/DDEATH/E1/+
* @param ctx
* @param topicName
* @param mqttMsg
*/
private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, MqttPublishMessage mqttMsg) {
int msgId = mqttMsg.variableHeader().packetId();
try {
SparkplugTopic sparkplugTopic = parseTopicPublish(topicName);
if (sparkplugTopic.isNode()) {
boolean isWildcardInPublish = topicName.contains("+");
if (!isWildcardInPublish && sparkplugTopic.isNode()) {
// A node topic
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload()));
switch (sparkplugTopic.getType()) {
@ -457,21 +478,28 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case NDATA:
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, sparkplugTopic);
break;
case NDEATH:
if (sparkplugSessionHandler.onValidateNDEATH(sparkplugBProtoNode)) {
doDisconnect();
break;
} else {
throw new ThingsboardException(SPARKPLUG_BD_SEQUENCE_NUMBER_KEY + " of " + NDEATH.name() + " is not equals " +
SPARKPLUG_BD_SEQUENCE_NUMBER_KEY + " of " + NBIRTH.name(), ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
default:
}
} else {
// A device topic
SparkplugBProto.Payload sparkplugBProtoDevice = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsg.payload()));
switch (sparkplugTopic.getType()) {
case DBIRTH:
case DCMD:
case DDATA:
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic);
break;
case DDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, sparkplugTopic.getDeviceId());
break;
default:
if (isWildcardInPublish) {
for (Entry<String, SparkplugDeviceSessionContext> entry : sparkplugSessionHandler.getDevices().entrySet()) {
String deviceName = entry.getKey();
SparkplugTopic sparkplugTopicDevice = sparkplugTopic;
sparkplugTopicDevice.updateDeviceIdPlus(deviceName);
handleSparkplugPublishDeviceMsg(sparkplugTopicDevice, msgId, mqttMsg, sparkplugBProtoDevice);
}
} else {
handleSparkplugPublishDeviceMsg(sparkplugTopic, msgId, mqttMsg, sparkplugBProtoDevice);
}
}
} catch (RuntimeException e) {
@ -484,6 +512,36 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
/**
* It may be the case that an Edge Node has many dynamic associated devices.
* Publish: spBv1.0/G1/DBIRTH/E1/+
* Publish: spBv1.0/G1/DDATA/E1/+
* Publish: spBv1.0/G1/DCMD/E1/+
* Publish: spBv1.0/G1/DDEATH/E1/+
* @param sparkplugTopic
* @param msgId
* @param mqttMsg
* @throws AdaptorException
* @throws ThingsboardException
* @throws InvalidProtocolBufferException
*/
private void handleSparkplugPublishDeviceMsg(SparkplugTopic sparkplugTopic, int msgId,
MqttPublishMessage mqttMsg, SparkplugBProto.Payload sparkplugBProtoDevice)
throws AdaptorException, ThingsboardException, InvalidProtocolBufferException {
// A device topic
switch (sparkplugTopic.getType()) {
case DBIRTH:
case DCMD:
case DDATA:
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, sparkplugTopic);
break;
case DDEATH:
sparkplugSessionHandler.onDeviceDisconnect(mqttMsg, sparkplugTopic.getDeviceId());
break;
default:
}
}
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
Matcher fwMatcher;
@ -786,7 +844,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
try {
if (sparkplugSessionHandler != null) {
sparkplugSessionHandler.handleSparkplugSubscribeMsg(grantedQoSList, subscription, reqQoS);
sparkplugSessionHandler.handleSparkplugSubscribeMsg(subscription);
activityReported = true;
} else {
switch (topic) {
@ -877,13 +935,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
registerSubQoS(topic, grantedQoSList, reqQoS);
}
public void processAttributesRpcSubscribeSparkplugNode(List<Integer> grantedQoSList, MqttQoS reqQoS) {
/**
* 3.0.0 Edge Node Session Establishment:
* ncmd-subscribe
* [tck-id-message-flow-edge-node-ncmd-subscribe] The MQTT client associated with the Edge
* Node MUST subscribe to a topic of the form spBv1.0/group_id/NCMD/edge_node_id where
* group_id is the Sparkplug Group ID and the edge_node_id is the Sparkplug Edge Node ID for
* this Edge Node. It MUST subscribe on this topic with a QoS of 1.
*/
public void processAttributesRpcSubscribeSparkplugNode() {
List<Integer> grantedQoSList = new ArrayList<>();
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionCtx.getSessionInfo())
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
.build(), null);
registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, reqQoS);
registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, AT_LEAST_ONCE);
}
public void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
@ -1180,12 +1247,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void checkSparkplugNodeSession(MqttConnectMessage connectMessage, ChannelHandlerContext ctx, SessionMetaData sessionMetaData) {
try {
if (sparkplugSessionHandler == null) {
SparkplugTopic sparkplugTopicNode = validatedSparkplugTopicConnectedNode(connectMessage);
if (sparkplugTopicNode != null) {
SparkplugTopic sparkplugTopic = validatedSparkplugConnectedWillTopic(connectMessage);
if (sparkplugTopic != null) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopicNode);
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopicNode);
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, true, sparkplugTopic);
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, sparkplugTopic);
sessionMetaData.setOverwriteActivityTime(true);
// ncmd-subscribe
processAttributesRpcSubscribeSparkplugNode();
} else {
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName());
throw new ThingsboardException("Invalid request body", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
@ -1198,12 +1267,33 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private SparkplugTopic validatedSparkplugTopicConnectedNode(MqttConnectMessage connectMessage) throws ThingsboardException {
/**
* The Death Certificate topic and payload described here are not published as an MQTT message by a client,
* but provided as parameters within the MQTT CONNECT control packet when this Sparkplug Edge Node first establishes the MQTT Client session.
* - NDEATH message MUST be registered as a Will Message in the MQTT CONNECT packet.
* -- in the MQTT CONNECT packet The NDEATH message MUST set the MQTT Will QoS to 1.
* -- in the MQTT CONNECT packet The NDEATH message MUST set the MQTT Will Retained flag to false.
* -- If the MQTT client is using MQTT v3.1.1, the Edge Nodes MQTT CONNECT packet MUST set the Clean Session flag to true.
* -- If the MQTT client is using MQTT v5.0, the Edge Nodes MQTT CONNECT packet MUST set the Clean Start flag to true and the Session Expiry Interval to 0
* @param connectMessage
* @return
* @throws ThingsboardException
*/
private SparkplugTopic validatedSparkplugConnectedWillTopic(MqttConnectMessage connectMessage) throws ThingsboardException {
if (StringUtils.isNotBlank(connectMessage.payload().willTopic())
&& connectMessage.payload().willMessageInBytes() != null
&& connectMessage.payload().willMessageInBytes().length > 0) {
SparkplugTopic sparkplugTopicNode = parseTopicPublish(connectMessage.payload().willTopic());
SparkplugTopic sparkplugTopicNode = parseTopic(connectMessage.payload().willTopic());
if (NDEATH.equals(sparkplugTopicNode.getType())) {
if (connectMessage.variableHeader().willQos() != 1 || connectMessage.variableHeader().isWillRetain())
return null;
if (!connectMessage.variableHeader().isCleanSession()) return null;
int mqttVer = connectMessage.variableHeader().version();
if (mqttVer == 5) {
Object sessionExpiryIntervalObj = connectMessage.variableHeader().properties().isEmpty() ? null : connectMessage.variableHeader().properties().getProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL.value());
Integer sessionExpiryInterval = sessionExpiryIntervalObj == null ? null : ((IntegerProperty) sessionExpiryIntervalObj).value();
if (sessionExpiryInterval == null || sessionExpiryInterval != 0) return null;
}
return sparkplugTopicNode;
}
}
@ -1302,18 +1392,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to device", sessionId);
try {
if (sparkplugSessionHandler != null) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
SparkplugMessageType.NCMD);
log.trace("[{}] Received attributes update notification to sparkplug Edge Node", sessionId);
notification.getSharedUpdatedList().forEach(tsKvProtoShared -> {
SparkplugMessageType messageType = NCMD;
TransportProtos.TsKvProto tsKvProto = tsKvProtoShared;
if ("JSON_V".equals(tsKvProtoShared.getKv().getType().name())) {
try {
messageType = SparkplugMessageType.parseMessageType(tsKvProtoShared.getKv().getKey());
tsKvProto = getTsKvProtoFromJsonNode(JacksonUtil.toJsonNode(tsKvProtoShared.getKv().getJsonV()), tsKvProtoShared.getTs());
} catch (ThingsboardException e) {
messageType = null;
log.error("Failed attributes update notification to sparkplug Edge Node [{}]. ", sparkplugSessionHandler.getSparkplugTopicNode().getEdgeNodeId(), e);
}
}
if (messageType != null && messageType.isSubscribe() && messageType.isNode()
&& sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), messageType);
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(sparkplugSessionHandler::writeAndFlush);
} else {
log.trace("Failed attributes update notification to sparkplug Edge Node [{}]. ", sparkplugSessionHandler.getSparkplugTopicNode().getEdgeNodeId());
}
});
} else {
@ -1322,7 +1424,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
}
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes update to MQTT msg", sessionId, e);
log.trace("[{}] Failed to convert device/Edge Node attributes update to MQTT msg", sessionId, e);
}
}
@ -1359,7 +1461,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) {
private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.
ToServerRpcRequestMsg rpcRequestMsg) {
var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId());
DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration();

View File

@ -315,6 +315,15 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
log.trace("[{}][{}][{}] First got or created device [{}], type [{}] for the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
/**
* 3.0.0 Device Session Establishment:
* dcmd-subscribe
* [tck-id-message-flow-device-dcmd-subscribe] If the Device supports writing to outputs, the
* MQTT client associated with the Device MUST subscribe to a topic of the form
* spBv1.0/group_id/DCMD/edge_node_id/device_id where group_id is the Sparkplug Group ID
* the edge_node_id is the Sparkplug Edge Node ID and the device_id is the Sparkplug Device ID
* for this Device. It MUST subscribe on this topic with a QoS of 1
*/
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
.setSessionInfo(deviceSessionInfo)
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
@ -736,6 +745,10 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(channel, deviceName, -1, postTelemetryMsg));
}
public ConcurrentMap<String, T> getDevices () {
return this.devices;
}
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final String deviceName, final int msgId, final T msg) {
return new TransportServiceCallback<Void>() {
@Override

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt.session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
@ -33,9 +34,10 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProtoFromJsonNode;
@Slf4j
public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext<SparkplugNodeSessionHandler> {
@ -51,27 +53,49 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC
super(parent, deviceInfo, deviceProfile, mqttQoSMap, transportService);
}
public Map<String, SparkplugBProto.Payload.Metric> getDeviceBirthMetrics() {
public Map<String, SparkplugBProto.Payload.Metric> getDeviceBirthMetrics() {
return deviceBirthMetrics;
}
public void setDeviceBirthMetrics(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.deviceBirthMetrics.putAll(metrics.stream()
.collect(Collectors.toMap(SparkplugBProto.Payload.Metric::getName, metric -> metric)));
public void setDeviceBirthMetrics(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
for (var metric : metrics) {
if (metric.hasName()) {
this.deviceBirthMetrics.put(metric.getName(), metric);
} else {
throw new IllegalArgumentException("The metric name of device: '" + this.getDeviceInfo().getDeviceName() + "' must not be empty or null! Metric: [" + metric + "]");
}
if (metric.hasAlias() && this.parent.getNodeAlias().putIfAbsent(metric.getAlias(), metric.getName()) != null) {
throw new DuplicateKeyException("The alias '" + metric.getAlias() + "' already exists in device: '" + this.getDeviceInfo().getDeviceName() + "'");
}
}
}
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
notification.getSharedUpdatedList().forEach(tsKvProtoShared -> {
SparkplugMessageType messageType = DCMD;
TransportProtos.TsKvProto tsKvProto = tsKvProtoShared;
if ("JSON_V".equals(tsKvProtoShared.getKv().getType().name())) {
try {
messageType = SparkplugMessageType.parseMessageType(tsKvProtoShared.getKv().getKey());
tsKvProto = getTsKvProtoFromJsonNode(JacksonUtil.toJsonNode(tsKvProtoShared.getKv().getJsonV()), tsKvProtoShared.getTs());
} catch (ThingsboardException e) {
messageType = null;
log.error("Failed attributes update notification to sparkplug device [{}]. ", deviceInfo.getDeviceName(), e);
}
}
if (messageType != null && messageType.isSubscribe() && messageType.isDevice()
&& getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(),
SparkplugMessageType.DCMD, deviceInfo.getDeviceName());
messageType, deviceInfo.getDeviceName());
parent.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
sparkplugTopic.toString(),
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(this.parent::writeAndFlush);
} else {
log.trace("Failed attributes update notification to sparkplug device [{}]. ", deviceInfo.getDeviceName());
}
});
}
@ -81,20 +105,22 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC
log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId);
try {
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
header.setMessageType(messageType.name());
TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(),
messageType, deviceInfo.getDeviceName());
parent.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo));
} else {
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " +
rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
if (messageType.isSubscribe()) {
SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
header.setMessageType(messageType.name());
TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(),
messageType, deviceInfo.getDeviceName());
parent.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo));
} else {
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " +
rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
}
}
} catch (ThingsboardException e) {
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),

View File

@ -18,19 +18,15 @@ package org.thingsboard.server.transport.mqtt.session;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.protobuf.Descriptors;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.adaptor.ProtoConverter;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
@ -41,7 +37,9 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SpecVersion;
import java.util.ArrayList;
import java.util.List;
@ -50,26 +48,32 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.parseMessageType;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.SPARKPLUG_BD_SEQUENCE_NUMBER_KEY;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.SPARKPLUG_SEQUENCE_NUMBER_KEY;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicSubscribe;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_STATE_REGEXP;
/**
* Created by nickAS21 on 12.12.22
*/
@Slf4j
@SpecVersion(spec = "sparkplug", version = "3.0.0")
public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<SparkplugDeviceSessionContext> {
@Getter
private final SparkplugTopic sparkplugTopicNode;
@Getter
private final Map<String, SparkplugBProto.Payload.Metric> nodeBirthMetrics;
@Getter
private final Map<Long, String> nodeAlias;
private final MqttTransportHandler parent;
public SparkplugNodeSessionHandler(MqttTransportHandler parent, DeviceSessionCtx deviceSessionCtx, UUID sessionId,
@ -78,25 +82,29 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
this.parent = parent;
this.sparkplugTopicNode = sparkplugTopicNode;
this.nodeBirthMetrics = new ConcurrentHashMap<>();
this.nodeAlias = new ConcurrentHashMap<>();
}
public void setNodeBirthMetrics(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.nodeBirthMetrics.putAll(metrics.stream()
.collect(Collectors.toMap(SparkplugBProto.Payload.Metric::getName, metric -> metric)));
}
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = getBytes(inbound.payload());
Descriptors.Descriptor telemetryDynamicMsgDescriptor = ProtoConverter.validateDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor());
try {
return JsonConverter.convertToTelemetryProto(JsonParser.parseString(ProtoConverter.dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor)));
} catch (Exception e) {
log.debug("Failed to decode post telemetry request", e);
throw new AdaptorException(e);
public void setNodeBirthMetrics(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) throws AdaptorException {
for (var metric : metrics) {
if (metric.hasName()) {
this.nodeBirthMetrics.put(metric.getName(), metric);
} else {
throw new AdaptorException("The metric name of edgeNode: '" + this.sparkplugTopicNode.getEdgeNodeId() + "' must not be empty or null! Metric: [" + metric + "]");
}
if (metric.hasAlias() && this.nodeAlias.putIfAbsent(metric.getAlias(), metric.getName()) != null) {
throw new AdaptorException("The alias '" + metric.getAlias() + "' already exists in edgeNode: '" + this.sparkplugTopicNode.getEdgeNodeId() + "'");
}
}
}
public boolean onValidateNDEATH(SparkplugBProto.Payload sparkplugBProto) throws ThingsboardException {
return sparkplugBProto.getMetricsCount() == 1 && SPARKPLUG_BD_SEQUENCE_NUMBER_KEY.equals(sparkplugBProto.getMetrics(0).getName())
&& this.nodeBirthMetrics.get(SPARKPLUG_BD_SEQUENCE_NUMBER_KEY) != null
&& sparkplugBProto.getMetrics(0).getLongValue() == this.nodeBirthMetrics.get(SPARKPLUG_BD_SEQUENCE_NUMBER_KEY).getLongValue();
}
public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
String deviceName = topic.getNodeDeviceName();
checkDeviceName(deviceName);
@ -115,7 +123,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
if (topic.isType(DBIRTH)) {
sendSparkplugStateOnTelemetry(ctx.getSessionInfo(), deviceName, ONLINE,
sparkplugBProto.getTimestamp());
ctx.setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
try {
ctx.setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
} catch (IllegalArgumentException | DuplicateKeyException e) {
throw new RuntimeException(e);
}
}
return ctx;
}, MoreExecutors.directExecutor());
@ -168,19 +180,21 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
public void handleSparkplugSubscribeMsg(List<Integer> grantedQoSList, MqttTopicSubscription subscription,
MqttQoS reqQoS) throws ThingsboardException {
SparkplugTopic sparkplugTopic = parseTopicSubscribe(subscription.topicFilter());
if (sparkplugTopic.getGroupId() == null) {
// TODO SUBSCRIBE NameSpace
} else if (sparkplugTopic.getType() == null) {
// TODO SUBSCRIBE GroupId
} else if (sparkplugTopic.isNode()) {
// SUBSCRIBE Node
parent.processAttributesRpcSubscribeSparkplugNode(grantedQoSList, reqQoS);
/**
* Subscribe: spBv1.0/STATE/my_primary_hos -> Implemented as status via checkSparkplugNodeSession
* Subscribe: CMD/DATA -> Implemented after connection: SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG/SUBSCRIBE_TO_RPC_ASYNC_MSG
* @param subscription
* @throws ThingsboardException
*/
public void handleSparkplugSubscribeMsg(MqttTopicSubscription subscription) throws ThingsboardException {
String topic = subscription.topicFilter();
if (topic != null && topic.startsWith(TOPIC_STATE_REGEXP)) {
log.trace("Subscribing on its own spBv1.0/STATE/[the Sparkplug Host Application] - Implemented as status via checkSparkplugNodeSession");
} else if (this.validateTopicDataSubscribe(topic)) {
// TODO if need subscription DATA
log.trace("Subscribing on its own [" + topic + "] - Implemented as SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG/SUBSCRIBE_TO_RPC_ASYNC_MSG via checkSparkplugNode/DeviceSession");
} else {
// SUBSCRIBE Device - DO NOTHING, WE HAVE ALREADY SUBSCRIBED.
// TODO: track that node subscribed to # or to particular device.
log.trace("Failed to subscribe to the topic: [" + topic + "].");
}
}
@ -192,7 +206,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
private ListenableFuture<SparkplugDeviceSessionContext> onDeviceConnectProto(SparkplugTopic topic) throws ThingsboardException {
private ListenableFuture<SparkplugDeviceSessionContext> onDeviceConnectProto(SparkplugTopic topic) throws
ThingsboardException {
try {
String deviceType = this.gateway.getDeviceType() + " device";
return onDeviceConnect(topic.getNodeDeviceName(), deviceType);
@ -202,22 +217,45 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set<String> attributesMetricNames, String topicTypeName) throws AdaptorException {
/**
* Sparkplug 3.0.0 -> 6.4.6. Metric
* https://sparkplug.eclipse.org/specification/version/3.0/documents/sparkplug-specification-3.0.0.pdf#%5B%7B%22num%22%3A339%2C%22gen%22%3A0%7D%2C%7B%22name%22%3A%22XYZ%22%7D%2C0%2C455.52%2Cnull%5D
* [tck-id-payloads-name-requirement] The name MUST be included with every metric unless aliases are being used. All UTF-8 characters are allowed in the metric name. However, special characters including but not limited to the following are discouraged: . , \ @ # $ % ^ & * ( ) [ ] { } | ! ` ~ : ; ' " < > ?. This is because many Sparkplug Host Applications may have issues handling them.
* alias (are optional and not required):
* - This is an unsigned 64-bit integer representing an optional alias for a Sparkplug B payload.
* - If aliases are used, the following rules apply:
* -- [tck-id-payloads-alias-uniqueness] If supplied in an NBIRTH or BIRTH it MUST be a unique number across this Edge Nodes entire set of metrics.
* -- no two metrics for the same Edge Node can have the same alias.
* -- [tck-id-payloads-alias-birth-requirement] NBIRTH and DBIRTH messages MUST include both a metric name and alias.
* -- [tck-id-payloads-alias-data-cmd-requirement] NDATA, DDATA, NCMD, and DCMD messages MUST only include an alias and the metric name MUST be excluded.
* @param sparkplugBProto
* @param attributesMetricNames
* @param topicTypeName
* @return
* @throws AdaptorException
*/
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload
sparkplugBProto, Set<String> attributesMetricNames, String topicTypeName) throws AdaptorException {
try {
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
if (attributesMetricNames == null || !matches(attributesMetricNames, protoMetric)) {
long ts = protoMetric.getTimestamp();
String key = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
keyValueProtoOpt.ifPresent(kvProto -> msgs.add(postTelemetryMsgCreated(kvProto, ts)));
String metricName = protoMetric.hasName() ? protoMetric.getName() : protoMetric.hasAlias() ? this.nodeAlias.get(protoMetric.getAlias()) : null;
if (metricName == null) {
throw new ThingsboardException("Metric without metricName and alias.", ThingsboardErrorCode.INVALID_ARGUMENTS);
} else {
if (attributesMetricNames == null || !matches(attributesMetricNames, metricName)) {
long ts = protoMetric.getTimestamp();
String key = SPARKPLUG_BD_SEQUENCE_NUMBER_KEY.equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
keyValueProtoOpt.ifPresent(kvProto -> msgs.add(postTelemetryMsgCreated(kvProto, ts)));
}
}
}
if (DBIRTH.name().equals(topicTypeName)) {
TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder();
keyValueProtoBuilder.setKey(topicTypeName + " " + "seq");
keyValueProtoBuilder.setKey(topicTypeName + " " + SPARKPLUG_SEQUENCE_NUMBER_KEY);
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.LONG_V);
keyValueProtoBuilder.setLongV(sparkplugBProto.getSeq());
msgs.add(postTelemetryMsgCreated(keyValueProtoBuilder.build(), sparkplugBProto.getTimestamp()));
@ -235,13 +273,18 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
try {
List<TransportApiProtos.AttributesMsg> msgs = new ArrayList<>();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
if (matches(attributesMetricNames, protoMetric)) {
TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder();
Optional<TransportProtos.PostAttributeMsg> msgOpt = getPostAttributeMsg(protoMetric);
if (msgOpt.isPresent()) {
deviceAttributesMsgBuilder.setDeviceName(deviceName);
deviceAttributesMsgBuilder.setMsg(msgOpt.get());
msgs.add(deviceAttributesMsgBuilder.build());
String metricName = protoMetric.hasName() ? protoMetric.getName() : protoMetric.hasAlias() ? this.nodeAlias.get(protoMetric.getAlias()) : null;
if (metricName == null) {
throw new ThingsboardException("Metric without metricName and alias.", ThingsboardErrorCode.INVALID_ARGUMENTS);
} else {
if (matches(attributesMetricNames, metricName)) {
TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder();
Optional<TransportProtos.PostAttributeMsg> msgOpt = getPostAttributeMsg(protoMetric);
if (msgOpt.isPresent()) {
deviceAttributesMsgBuilder.setDeviceName(deviceName);
deviceAttributesMsgBuilder.setMsg(msgOpt.get());
msgs.add(deviceAttributesMsgBuilder.build());
}
}
}
}
@ -252,8 +295,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
private boolean matches(Set<String> attributesMetricNames, SparkplugBProto.Payload.Metric protoMetric) {
String metricName = protoMetric.getName();
private boolean matches(Set<String> attributesMetricNames, String metricName) {
for (String attributeMetricFilter : attributesMetricNames) {
if (metricName.equals(attributeMetricFilter) ||
(attributeMetricFilter.endsWith("*") && metricName.startsWith(
@ -264,7 +306,8 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
return false;
}
private Optional<TransportProtos.PostAttributeMsg> getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
private Optional<TransportProtos.PostAttributeMsg> getPostAttributeMsg(SparkplugBProto.Payload.Metric
protoMetric) throws ThingsboardException {
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric);
if (keyValueProtoOpt.isPresent()) {
TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder();
@ -285,7 +328,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
if (value.isPresent()) {
SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts);
cmdPayload.addMetrics(createMetric(value.get(), ts, tsKvProto.getKv().getKey(), metricDataType));
String metricName = tsKvProto.getKv().getKey();
Long alias = metricBirth.hasAlias() ? metricBirth.getAlias() : -1;
cmdPayload.addMetrics(createMetric(value.get(), ts, alias == -1 ? metricName : null, metricDataType, alias));
byte[] payloadInBytes = cmdPayload.build().toByteArray();
return Optional.of(getPayloadAdaptor().createMqttPublishMsg(deviceSessionCtx, sparkplugTopic, payloadInBytes));
} else {
@ -304,16 +349,37 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
protected void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
protected void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg
rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo);
}
protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {
protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo,
int requestId, ThingsboardErrorCode result, String errorMsg) {
parent.sendErrorRpcResponse(sessionInfo, requestId, result, errorMsg);
}
protected void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) {
parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg);
/**
* Subscribe: spBv1.0/G1/DDATA/E1
* Subscribe: spBv1.0/G1/DDATA/E1/#
* Subscribe: spBv1.0/G1/DDATA/E1/+
* Subscribe: spBv1.0/G1/DDATA/E1/D1
* Subscribe: spBv1.0/G1/DDATA/E1/D1/#
* Subscribe: spBv1.0/G1/DDATA/E1/D1/+
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
* @param topic a topic UTF-8
* @return a {@link SparkplugTopic} instance
* @throws ThingsboardException if an error occurs while parsing
*/
public boolean validateTopicDataSubscribe(String topic) throws ThingsboardException {
String[] splitTopic = topic.split(TOPIC_SPLIT_REGEXP);
if (splitTopic.length >= 4 && splitTopic.length <= 5 &&
splitTopic[0].equals(this.sparkplugTopicNode.getNamespace()) &&
splitTopic[1].equals(this.sparkplugTopicNode.getGroupId()) &&
splitTopic[3].equals(this.sparkplugTopicNode.getEdgeNodeId())) {
SparkplugMessageType messageType = parseMessageType(splitTopic[2]);
return messageType.isData();
}
return false;
}
}

View File

@ -0,0 +1,77 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
public class DeviceDescriptor extends EdgeNodeDescriptor {
private final String deviceId;
private final String descriptorString;
public DeviceDescriptor(String groupId, String edgeNodeId, String deviceId) {
super(groupId, edgeNodeId);
this.deviceId = deviceId;
this.descriptorString = groupId + "/" + edgeNodeId + "/" + deviceId;
}
public DeviceDescriptor(String descriptorString) {
super(descriptorString.substring(0, descriptorString.lastIndexOf("/")));
this.deviceId = descriptorString.substring(descriptorString.lastIndexOf("/") + 1);
this.descriptorString = descriptorString;
}
public DeviceDescriptor(EdgeNodeDescriptor edgeNodeDescriptor, String deviceId) {
super(edgeNodeDescriptor.getGroupId(), edgeNodeDescriptor.getEdgeNodeId());
this.deviceId = deviceId;
this.descriptorString = edgeNodeDescriptor.getDescriptorString() + "/" + deviceId;
}
public String getDeviceId() {
return deviceId;
}
/**
* Returns a {@link String} representing the Device's Descriptor of the form:
* "<groupName>/<edgeNodeName>/<deviceId>".
*
* @return a {@link String} representing the Device's Descriptor.
*/
@Override
public String getDescriptorString() {
return descriptorString;
}
public String getEdgeNodeDescriptorString() {
return super.getDescriptorString();
}
@Override
public int hashCode() {
return this.getDescriptorString().hashCode();
}
@Override
public boolean equals(Object object) {
if (object instanceof DeviceDescriptor) {
return this.getDescriptorString().equals(((DeviceDescriptor) object).getDescriptorString());
}
return this.getDescriptorString().equals(object);
}
@Override
public String toString() {
return getDescriptorString();
}
}

View File

@ -0,0 +1,80 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonValue;
public class EdgeNodeDescriptor implements SparkplugDescriptor{
private final String groupId;
private final String edgeNodeId;
private final String descriptorString;
public EdgeNodeDescriptor(String groupId, String edgeNodeId) {
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.descriptorString = groupId + "/" + edgeNodeId;
}
/**
* Creates and EdgeNodeDescriptor from a {@link String} of the form group_name/edge_node_name
*
* @param descriptorString the {@link String} representation of an EdgeNodeDescriptor
*/
public EdgeNodeDescriptor(String descriptorString) {
String[] tokens = descriptorString.split("/");
this.groupId = tokens[0];
this.edgeNodeId = tokens[1];
this.descriptorString = descriptorString;
}
public String getGroupId() {
return groupId;
}
public String getEdgeNodeId() {
return edgeNodeId;
}
/**
* Returns a {@link String} representing the Edge Node's Descriptor of the form: "<groupId>/<edgeNodeId>".
*
* @return a {@link String} representing the Edge Node's Descriptor.
*/
@Override
public String getDescriptorString() {
return descriptorString;
}
@Override
public int hashCode() {
return this.getDescriptorString().hashCode();
}
@Override
public boolean equals(Object object) {
if (object instanceof EdgeNodeDescriptor) {
return this.getDescriptorString().equals(((EdgeNodeDescriptor) object).getDescriptorString());
}
return this.getDescriptorString().equals(object);
}
@Override
@JsonValue
public String toString() {
return getDescriptorString();
}
}

View File

@ -52,7 +52,24 @@ public enum MetricDataType {
File(18, SparkplugMetricUtil.File.class),
Template(19, SparkplugBProto.Payload.Template.class),
// PropertyValue Types (20 and 21) are NOT metric datatypes
// Additional PropertyValue Types (PropertyValue Types (20 and 21) are NOT metric datatypes)
PropertySet(20, SparkplugBProto.Payload.PropertySet.class),
PropertySetList(21, SparkplugBProto.Payload.PropertySetList.class),
// Array Types
Int8Array(22, Byte[].class),
Int16Array(23, Short[].class),
Int32Array(24, Integer[].class),
Int64Array(25, Long[].class),
UInt8Array(26, Short[].class),
UInt16Array(27, Integer[].class),
UInt32Array(28, Long[].class),
UInt64Array(29, BigInteger[].class),
FloatArray(30, Float[].class),
DoubleArray(31, Double[].class),
BooleanArray(32, Boolean[].class),
StringArray(33, String[].class),
DateTimeArray(34, Date[].class),
// Unknown
Unknown(0, Object.class);
@ -140,6 +157,36 @@ public enum MetricDataType {
return File;
case 19:
return Template;
case 20:
return PropertySet;
case 21:
return PropertySetList;
case 22:
return Int8Array;
case 23:
return Int16Array;
case 24:
return Int32Array;
case 25:
return Int64Array;
case 26:
return UInt8Array;
case 27:
return UInt16Array;
case 28:
return UInt32Array;
case 29:
return UInt64Array;
case 30:
return FloatArray;
case 31:
return DoubleArray;
case 32:
return BooleanArray;
case 33:
return StringArray;
case 34:
return DateTimeArray;
default:
return Unknown;
}

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
public interface SparkplugDescriptor {
/**
* Returns the String representation of this {@link SparkplugDescriptor}
*
* @return the String representation of this {@link SparkplugDescriptor}
*/
public String getDescriptorString();
}

View File

@ -91,10 +91,14 @@ public enum SparkplugMessageType {
return STATE.equals(type) ? "sparkplugConnectionState" : type.name();
}
public boolean isState() {
return this.equals(STATE);
}
public boolean isDeath() {
return this.equals(DDEATH) || this.equals(NDEATH);
}
public boolean isCommand() {
return this.equals(DCMD) || this.equals(NCMD);
}
@ -110,4 +114,19 @@ public enum SparkplugMessageType {
public boolean isRecord() {
return this.equals(DRECORD) || this.equals(NRECORD);
}
public boolean isSubscribe() {
return isCommand() || isData() || isRecord();
}
public boolean isNode() {
return this.equals(NBIRTH)
|| this.equals(NCMD) || this.equals(NDATA)
||this.equals(NDEATH) || this.equals(NRECORD);
}
public boolean isDevice() {
return this.equals(DBIRTH)
|| this.equals(DCMD) || this.equals(DDATA)
||this.equals(DDEATH) || this.equals(DRECORD);
}
}

View File

@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric.Builder;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@ -45,6 +47,9 @@ import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
@Slf4j
public class SparkplugMetricUtil {
public static final String SPARKPLUG_SEQUENCE_NUMBER_KEY = "seq";
public static final String SPARKPLUG_BD_SEQUENCE_NUMBER_KEY = "bdSeq";
public static Optional<TransportProtos.KeyValueProto> fromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
// Check if the null flag has been set indicating that the value is null
if (protoMetric.getIsNull()) {
@ -141,13 +146,20 @@ public class SparkplugMetricUtil {
return Optional.empty();
}
}
public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType, Long alias) throws ThingsboardException {
Builder metric = Metric.newBuilder();
metric.setTimestamp(ts)
.setDatatype(metricDataType.toIntValue());
if (alias >= 0) {
metric.setAlias(alias);
}
if (StringUtils.isNotBlank(key)) {
metric.setName(key);
}
return addToMetricValue(value, metric.build(), metricDataType);
}
public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException {
SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder()
.setTimestamp(ts)
.setName(key)
.setDatatype(metricDataType.toIntValue())
.build();
public static SparkplugBProto.Payload.Metric addToMetricValue(Object value, SparkplugBProto.Payload.Metric metric, MetricDataType metricDataType) throws ThingsboardException {
switch (metricDataType) {
case Int8: // (byte)
return metric.toBuilder().setIntValue(((Byte) value).intValue()).build();
@ -189,6 +201,12 @@ public class SparkplugMetricUtil {
return metric;
}
public static TransportProtos.TsKvProto getTsKvProtoFromJsonNode(JsonNode kvProto, long ts) throws ThingsboardException {
String kvProtoKey = kvProto.fieldNames().next();
String kvProtoValue = kvProto.get(kvProtoKey).asText();
return getTsKvProto(kvProtoKey, kvProtoValue, ts);
}
public static TransportProtos.TsKvProto getTsKvProto(String key, Object value, long ts) throws ThingsboardException {
try {
TransportProtos.TsKvProto.Builder tsKvProtoBuilder = TransportProtos.TsKvProto.newBuilder();

View File

@ -15,65 +15,129 @@
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.parseMessageType;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_ROOT_SPB_V_1_0;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicService.TOPIC_SPLIT_REGEXP;
/**
* Created by nickAS21 on 12.12.22
* A Sparkplug MQTT Topic
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class SparkplugTopic {
/**
* The Sparkplug namespace version.
* For the Sparkplug B version of the specification, the UTF-8 string constant for the namespace element will be: spBv1.0
*/
private String namespace;
private final String namespace;
/**
* The SparkplugDesciptor for this Edge Node or Device
*/
@JsonIgnore
private final SparkplugDescriptor sparkplugDescriptor;
/**
* The {@link EdgeNodeDescriptor} for this Edge Node or Device
*/
private final EdgeNodeDescriptor edgeNodeDescriptor;
/**
* The ID of the logical grouping of Edge of Network (EoN) Nodes and devices.
*/
private String groupId;
private final String groupId;
/**
* The ID of the Edge of Network (EoN) Node.
*/
private String edgeNodeId;
private final String edgeNodeId;
/**
* The ID of the device.
*/
private String deviceId;
/**
* The ID if this is a Sparkplug Host Application topic
*/
private final String hostApplicationId;
/**
* The message type.
*/
private SparkplugMessageType type;
private final SparkplugMessageType type;
/**
* Constructor (device).
public SparkplugTopic() {
this.namespace = null;
this.sparkplugDescriptor = null;
this.edgeNodeDescriptor = null;
this.groupId = null;
this.edgeNodeId = null;
this.deviceId = null;
this.hostApplicationId = null;
this.type = null;
}
public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type) {
super();
this.namespace = sparkplugTopic.namespace;
this.groupId = sparkplugTopic.groupId;
this.edgeNodeId = sparkplugTopic.edgeNodeId;
this.sparkplugDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId);
this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId);
this.deviceId = null;
this.type = type;
this.hostApplicationId = null;
}
public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type, String deviceId) {
super();
this.namespace = sparkplugTopic.namespace;
this.groupId = sparkplugTopic.groupId;
this.edgeNodeId = sparkplugTopic.edgeNodeId;
this.deviceId = deviceId;
this.sparkplugDescriptor = deviceId == null
? new EdgeNodeDescriptor(groupId, edgeNodeId)
: new DeviceDescriptor(groupId, edgeNodeId, deviceId);
this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId);
this.type = type;
this.hostApplicationId = null;
}
/**
* A Constructor for Device Topics
*
* @param namespace the namespace.
* @param groupId the group ID.
* @param edgeNodeId the edge node ID.
* @param deviceId the device ID.
* @param type the message type.
* @param namespace the namespace
* @param groupId the Group ID
* @param edgeNodeId the Edge Node ID
* @param deviceId the Device ID
* @param type the message type
*/
public SparkplugTopic(String namespace, String groupId, String edgeNodeId, String deviceId, SparkplugMessageType type) {
super();
this.namespace = namespace;
this.sparkplugDescriptor = deviceId == null
? new EdgeNodeDescriptor(groupId, edgeNodeId)
: new DeviceDescriptor(groupId, edgeNodeId, deviceId);
this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId);
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.deviceId = deviceId;
this.hostApplicationId = null;
this.type = type;
}
/**
* Constructor (node).
*
* @param namespace the namespace.
* @param groupId the group ID.
* @param edgeNodeId the edge node ID.
* @param type the message type.
* A Constructor for Edge Node Topics
* @param namespace the namespace
* @param groupId the group ID
* @param edgeNodeId the edge node ID
* @param type the message type
*/
public SparkplugTopic(String namespace, String groupId, String edgeNodeId, SparkplugMessageType type) {
super();
@ -81,33 +145,110 @@ public class SparkplugTopic {
this.groupId = groupId;
this.edgeNodeId = edgeNodeId;
this.deviceId = null;
this.type = type;
}
public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type) {
super();
this.namespace = sparkplugTopic.namespace;
this.groupId = sparkplugTopic.groupId;
this.edgeNodeId = sparkplugTopic.edgeNodeId;
this.deviceId = null;
this.type = type;
}
public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type, String deviceId) {
super();
this.namespace = sparkplugTopic.namespace;
this.groupId = sparkplugTopic.groupId;
this.edgeNodeId = sparkplugTopic.edgeNodeId;
this.deviceId = deviceId;
this.sparkplugDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId);
this.edgeNodeDescriptor = new EdgeNodeDescriptor(groupId, edgeNodeId);
this.hostApplicationId = null;
this.type = type;
}
/**
* @return the Sparkplug namespace version
* A Constructor for Device Topics
*
* @param namespace the namespace
* @param deviceDescriptor the {@link EdgeNodeDescriptor}
* @param type the message type
*/
public SparkplugTopic(String namespace, DeviceDescriptor deviceDescriptor, SparkplugMessageType type) {
this(namespace, deviceDescriptor.getGroupId(), deviceDescriptor.getEdgeNodeId(), deviceDescriptor.getDeviceId(),
type);
}
/**
* A Constructor for Edge Node Topics
*
* @param namespace the namespace
* @param edgeNodeDescriptor the {@link EdgeNodeDescriptor}
* @param type the message type
*/
public SparkplugTopic(String namespace, EdgeNodeDescriptor edgeNodeDescriptor, SparkplugMessageType type) {
this(namespace, edgeNodeDescriptor.getGroupId(), edgeNodeDescriptor.getEdgeNodeId(), type);
}
/**
* A Constructor for Host Application Topics
*
* @param namespace the namespace
* @param hostApplicationId the Host Application ID
*/
public SparkplugTopic(String namespace, String hostApplicationId, SparkplugMessageType type) {
super();
this.namespace = namespace;
this.hostApplicationId = hostApplicationId;
this.type = type;
this.sparkplugDescriptor = null;
this.edgeNodeDescriptor = null;
this.groupId = null;
this.edgeNodeId = null;
this.deviceId = null;
}
public static SparkplugTopic parseTopic(String topicString) throws ThingsboardException {
try {
if (isValidIdElementToUTF8(topicString)) {
SparkplugMessageType messageType;
String[] splitTopic = topicString.split(TOPIC_SPLIT_REGEXP);
if (TOPIC_ROOT_SPB_V_1_0.equals(splitTopic[0])) {
if (splitTopic.length == 3) {
messageType = parseMessageType(splitTopic[1]);
if (messageType.isState())
return new SparkplugTopic(TOPIC_ROOT_SPB_V_1_0, splitTopic[2], messageType);
} else if (splitTopic.length == 4) {
messageType = parseMessageType(splitTopic[2]);
if (messageType.isNode())
return new SparkplugTopic(TOPIC_ROOT_SPB_V_1_0, splitTopic[1], splitTopic[3], messageType);
} else if (splitTopic.length == 5) {
messageType = parseMessageType(splitTopic[2]);
if (messageType.isDevice())
return new SparkplugTopic(TOPIC_ROOT_SPB_V_1_0, splitTopic[1], splitTopic[3], splitTopic[4], messageType);
}
}
}
throw new ThingsboardException("Invalid Sparkplug topic from String: " + topicString, ThingsboardErrorCode.INVALID_ARGUMENTS);
} catch (
Exception e) {
throw new ThingsboardException(e, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
}
/**
* Returns the Sparkplug namespace version.
*
* @return the namespace
*/
public String getNamespace() {
return namespace;
}
/**
* Returns the {@link SparkplugDescriptor}
*
* @return the SparkplugDescriptor
*/
public SparkplugDescriptor getSparkplugDescriptor() {
return sparkplugDescriptor;
}
/**
* Returns the {@link EdgeNodeDescriptor}
*
* @return the EdgeNodeDescriptor
*/
public EdgeNodeDescriptor getEdgeNodeDescriptor() {
return edgeNodeDescriptor;
}
/**
* Returns the ID of the logical grouping of Edge of Network (EoN) Nodes and devices.
*
@ -118,20 +259,39 @@ public class SparkplugTopic {
}
/**
* @return the ID of the Edge of Network (EoN) Node
* Returns the ID of the Edge of Network (EoN) Node.
*
* @return the edge node ID
*/
public String getEdgeNodeId() {
return edgeNodeId;
}
/**
* Returns the ID of the device.
*
* @return the device ID
*/
public String getDeviceId() {
return deviceId;
}
public void updateDeviceIdPlus(String deviceIdNew) {
this.deviceId = this.deviceId.equals("+") ? deviceIdNew : this.deviceId;
}
/**
* Returns the Host Application ID if this is a Host topic
*
* @return the Host Application ID
*/
public String getHostApplicationId() {
return hostApplicationId;
}
/**
* Returns the message type.
*
* @return the message type
*/
public SparkplugMessageType getType() {
@ -140,12 +300,15 @@ public class SparkplugTopic {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(getNamespace()).append("/")
.append(getGroupId()).append("/")
.append(getType()).append("/")
.append(getEdgeNodeId());
if (getDeviceId() != null) {
sb.append("/").append(getDeviceId());
StringBuilder sb = new StringBuilder();
if (hostApplicationId == null) {
sb.append(getNamespace()).append("/").append(getGroupId()).append("/").append(getType()).append("/")
.append(getEdgeNodeId());
if (getDeviceId() != null) {
sb.append("/").append(getDeviceId());
}
} else {
sb.append(getNamespace()).append("/").append(getType()).append("/").append(hostApplicationId);
}
return sb.toString();
}
@ -165,5 +328,13 @@ public class SparkplugTopic {
public String getNodeDeviceName() {
return isNode() ? edgeNodeId : deviceId;
}
public static boolean isValidIdElementToUTF8(String deviceIdElement) {
if (deviceIdElement == null) {
return false;
}
String regex = "^(?!.*//)[^+#]*$";
return deviceIdElement.matches(regex);
}
}

View File

@ -0,0 +1,67 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent;
import java.util.HashMap;
import java.util.Map;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic.parseTopic;
@Slf4j
@Service
@TbMqttTransportComponent
public class SparkplugTopicService {
private static final Map<String, SparkplugTopic> SPLIT_TOPIC_CACHE = new HashMap<>();
public static final String TOPIC_ROOT_SPB_V_1_0 = "spBv1.0";
public static final String TOPIC_ROOT_CERT_SP = "$sparkplug/certificates/";
public static final String TOPIC_SPLIT_REGEXP = "/";
public static final String TOPIC_STATE_REGEXP = TOPIC_ROOT_SPB_V_1_0 + TOPIC_SPLIT_REGEXP + STATE.name() + TOPIC_SPLIT_REGEXP;
public static SparkplugTopic getSplitTopic(String topic) throws ThingsboardException {
SparkplugTopic sparkplugTopic = SPLIT_TOPIC_CACHE.get(topic);
if (sparkplugTopic == null) {
// validation topic
sparkplugTopic = parseTopic(topic);
SPLIT_TOPIC_CACHE.put(topic, sparkplugTopic);
}
return sparkplugTopic;
}
/**
* all ID Element MUST be a UTF-8 string
* and with the exception of the reserved characters of + (plus), / (forward slash).
* Publish: $sparkplug/certificates/spBv1.0/G1/NBIRTH/E1
* Publish: spBv1.0/G1/NBIRTH/E1
* Publish: $sparkplug/certificates/spBv1.0/G1/DBIRTH/E1/D1
* Publish: spBv1.0/G1/DBIRTH/E1/D1
* @param topic
* @return
* @throws ThingsboardException
*/
public static SparkplugTopic parseTopicPublish(String topic) throws ThingsboardException {
topic = topic.startsWith(TOPIC_ROOT_CERT_SP) ? topic.substring(TOPIC_ROOT_CERT_SP.length()) : topic;
topic = topic.indexOf("+") > 0 ? topic.substring(0, topic.indexOf("+")): topic;
return getSplitTopic(topic);
}
}

View File

@ -1,116 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import java.util.HashMap;
import java.util.Map;
/**
* Provides utility methods for handling Sparkplug MQTT message topics.
*/
public class SparkplugTopicUtil {
private static final Map<String, String[]> SPLIT_TOPIC_CACHE = new HashMap<String, String[]>();
private static final String TOPIC_INVALID_NUMBER = "Invalid number of topic elements: ";
public static final String NAMESPACE = "spBv1.0";
public static String[] getSplitTopic(String topic) {
String[] splitTopic = SPLIT_TOPIC_CACHE.get(topic);
if (splitTopic == null) {
splitTopic = topic.split("/");
SPLIT_TOPIC_CACHE.put(topic, splitTopic);
}
return splitTopic;
}
/**
* Serializes a {@link SparkplugTopic} instance in to a JSON string.
*
* @param topic a {@link SparkplugTopic} instance
* @return a JSON string
* @throws JsonProcessingException
*/
public static String sparkplugTopicToString(SparkplugTopic topic) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(topic);
}
/**
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
*
* @param topic a topic string
* @return a {@link SparkplugTopic} instance
* @throws ThingsboardException if an error occurs while parsing
*/
public static SparkplugTopic parseTopicSubscribe(String topic) throws ThingsboardException {
// TODO "+", "$"
topic = topic.indexOf("#") > 0 ? topic.substring(0, topic.indexOf("#")) : topic;
return parseTopic(SparkplugTopicUtil.getSplitTopic(topic));
}
public static SparkplugTopic parseTopicPublish(String topic) throws ThingsboardException {
if (topic.contains("#") || topic.contains("$") || topic.contains("+")) {
throw new ThingsboardException("Invalid of topic elements for Publish", ThingsboardErrorCode.INVALID_ARGUMENTS);
} else {
String[] splitTopic = SparkplugTopicUtil.getSplitTopic(topic);
if (splitTopic.length < 4 || splitTopic.length > 5) {
throw new ThingsboardException(TOPIC_INVALID_NUMBER + splitTopic.length, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
return parseTopic(splitTopic);
}
}
/**
* Parses a Sparkplug MQTT message topic string and returns a {@link SparkplugTopic} instance.
*
* @param splitTopic a topic split into tokens
* @return a {@link SparkplugTopic} instance
* @throws Exception if an error occurs while parsing
*/
@SuppressWarnings("incomplete-switch")
public static SparkplugTopic parseTopic(String[] splitTopic) throws ThingsboardException {
int length = splitTopic.length;
if (length == 0) {
throw new ThingsboardException(TOPIC_INVALID_NUMBER + length, ThingsboardErrorCode.INVALID_ARGUMENTS);
} else {
SparkplugMessageType type;
String namespace, edgeNodeId, groupId, deviceId;
namespace = validateNameSpace(splitTopic[0]);
groupId = length > 1 ? splitTopic[1] : null;
type = length > 2 ? SparkplugMessageType.parseMessageType(splitTopic[2]) : null;
edgeNodeId = length > 3 ? splitTopic[3] : null;
deviceId = length > 4 ? splitTopic[4] : null;
return new SparkplugTopic(namespace, groupId, edgeNodeId, deviceId, type);
}
}
/**
* For the Sparkplug B version of the specification, the UTF-8 string constant for the namespace element will be: "spBv1.0"
* @param nameSpace
* @return
*/
private static String validateNameSpace(String nameSpace) throws ThingsboardException {
if (NAMESPACE.equals(nameSpace)) return nameSpace;
throw new ThingsboardException("The namespace [" + nameSpace + "] is not valid and must be [" + NAMESPACE + "] for the Sparkplug™ B version.", ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util.sparkplug;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Target;
@Documented
@Target(ElementType.TYPE)
public @interface SpecVersion {
String spec() default "";
String version();
}

View File

@ -21,51 +21,71 @@ import "google/protobuf/any.proto";
option java_package = "org.thingsboard.server.gen.transport.mqtt";
option java_outer_classname = "SparkplugBProto";
message Payload {
/*
// Indexes of Data Types
// Unknown placeholder for future expansion.
Unknown = 0;
// Basic Types
Int8 = 1;
Int16 = 2;
Int32 = 3;
Int64 = 4;
UInt8 = 5;
UInt16 = 6;
UInt32 = 7;
UInt64 = 8;
Float = 9;
Double = 10;
Boolean = 11;
String = 12;
DateTime = 13;
Text = 14;
// Additional Metric Types
UUID = 15;
DataSet = 16;
Bytes = 17;
File = 18;
Template = 19;
// Additional PropertyValue Types
PropertySet = 20;
PropertySetList = 21;
*/
enum DataType {
// Indexes of Data Types
// Unknown placeholder for future expansion.
Unknown = 0;
// Basic Types
Int8 = 1;
Int16 = 2;
Int32 = 3;
Int64 = 4;
UInt8 = 5;
UInt16 = 6;
UInt32 = 7;
UInt64 = 8;
Float = 9;
Double = 10;
Boolean = 11;
String = 12;
DateTime = 13;
Text = 14;
// Additional Metric Types
UUID = 15;
DataSet = 16;
Bytes = 17;
File = 18;
Template = 19;
// Additional PropertyValue Types
PropertySet = 20;
PropertySetList = 21;
// Array Types
Int8Array = 22;
Int16Array = 23;
Int32Array = 24;
Int64Array = 25;
UInt8Array = 26;
UInt16Array = 27;
UInt32Array = 28;
UInt64Array = 29;
FloatArray = 30;
DoubleArray = 31;
BooleanArray = 32;
StringArray = 33;
DateTimeArray = 34;
}
message Payload {
message Template {
message Parameter {
optional string name = 1;
optional uint32 type = 2;
optional string name = 1;
optional uint32 type = 2;
oneof value {
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
ParameterValueExtension extension_value = 9;
}
@ -74,12 +94,12 @@ message Payload {
}
}
optional string version = 1; // The version of the Template to prevent mismatches
repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
repeated Parameter parameters = 3;
optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
optional bool is_definition = 5;
google.protobuf.Any extensions = 6;
optional string version = 1; // The version of the Template to prevent mismatches
repeated Metric metrics = 2; // Each metric is the name of the metric and the datatype of the member but does not contain a value
repeated Parameter parameters = 3;
optional string template_ref = 4; // Reference to a template if this is extending a Template or an instance - must exist if an instance
optional bool is_definition = 5;
google.protobuf.Any extensions = 6;
}
message DataSet {
@ -87,118 +107,118 @@ message Payload {
message DataSetValue {
oneof value {
uint32 int_value = 1;
uint64 long_value = 2;
float float_value = 3;
double double_value = 4;
bool boolean_value = 5;
string string_value = 6;
DataSetValueExtension extension_value = 7;
uint32 int_value = 1;
uint64 long_value = 2;
float float_value = 3;
double double_value = 4;
bool boolean_value = 5;
string string_value = 6;
DataSetValueExtension extension_value = 7;
}
message DataSetValueExtension {
google.protobuf.Any extensions = 1;
google.protobuf.Any extensions = 1;
}
}
message Row {
repeated DataSetValue elements = 1;
google.protobuf.Any extensions = 2; // For third party extensions
repeated DataSetValue elements = 1;
google.protobuf.Any extensions = 2; // For third party extensions
}
optional uint64 num_of_columns = 1;
repeated string columns = 2;
repeated uint32 types = 3;
repeated Row rows = 4;
google.protobuf.Any extensions = 5; // For third party extensions
optional uint64 num_of_columns = 1;
repeated string columns = 2;
repeated uint32 types = 3;
repeated Row rows = 4;
google.protobuf.Any extensions = 5; // For third party extensions
}
message PropertyValue {
optional uint32 type = 1;
optional bool is_null = 2;
optional uint32 type = 1;
optional bool is_null = 2;
oneof value {
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
PropertySet propertyset_value = 9;
PropertySetList propertysets_value = 10; // List of Property Values
PropertyValueExtension extension_value = 11;
uint32 int_value = 3;
uint64 long_value = 4;
float float_value = 5;
double double_value = 6;
bool boolean_value = 7;
string string_value = 8;
PropertySet propertyset_value = 9;
PropertySetList propertysets_value = 10; // List of Property Values
PropertyValueExtension extension_value = 11;
}
message PropertyValueExtension {
google.protobuf.Any extensions = 1;
google.protobuf.Any extensions = 1;
}
}
message PropertySet {
repeated string keys = 1; // Names of the properties
repeated PropertyValue values = 2;
google.protobuf.Any extensions = 3;
repeated string keys = 1; // Names of the properties
repeated PropertyValue values = 2;
google.protobuf.Any extensions = 3;
}
message PropertySetList {
repeated PropertySet propertyset = 1;
google.protobuf.Any extensions = 2;
google.protobuf.Any extensions = 2;
}
message MetaData {
// Bytes specific metadata
optional bool is_multi_part = 1;
optional bool is_multi_part = 1;
// General metadata
optional string content_type = 2; // Content/Media type
optional uint64 size = 3; // File size, String size, Multi-part size, etc
optional uint64 seq = 4; // Sequence number for multi-part messages
optional string content_type = 2; // Content/Media type
optional uint64 size = 3; // File size, String size, Multi-part size, etc
optional uint64 seq = 4; // Sequence number for multi-part messages
// File metadata
optional string file_name = 5; // File name
optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
optional string md5 = 7; // md5 of data
optional string file_name = 5; // File name
optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
optional string md5 = 7; // md5 of data
// Catchalls and future expansion
optional string description = 8; // Could be anything such as json or xml of custom properties
google.protobuf.Any extensions = 9;
optional string description = 8; // Could be anything such as json or xml of custom properties
google.protobuf.Any extensions = 9;
}
message Metric {
optional string name = 1; // Metric name - should only be included on birth
optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
optional uint32 datatype = 4; // DataType of the metric/tag value
optional bool is_historical = 5; // If this is historical data and should not update real time tag
optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
optional MetaData metadata = 8; // Metadata for the payload
optional string name = 1; // Metric name - should only be included on birth
optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
optional uint32 datatype = 4; // DataType of the metric/tag value
optional bool is_historical = 5; // If this is historical data and should not update real time tag
optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
optional MetaData metadata = 8; // Metadata for the payload
optional PropertySet properties = 9;
oneof value {
uint32 int_value = 10;
uint64 long_value = 11;
float float_value = 12;
double double_value = 13;
bool boolean_value = 14;
string string_value = 15;
bytes bytes_value = 16; // Bytes, File
DataSet dataset_value = 17;
Template template_value = 18;
MetricValueExtension extension_value = 19;
uint32 int_value = 10;
uint64 long_value = 11;
float float_value = 12;
double double_value = 13;
bool boolean_value = 14;
string string_value = 15;
bytes bytes_value = 16; // Bytes, File
DataSet dataset_value = 17;
Template template_value = 18;
MetricValueExtension extension_value = 19;
}
message MetricValueExtension {
google.protobuf.Any extensions = 1;
google.protobuf.Any extensions = 1;
}
}
optional uint64 timestamp = 1; // Timestamp at message sending time
repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
optional uint64 seq = 3; // Sequence number
optional string uuid = 4; // UUID to track message type in terms of schema definitions
optional bytes body = 5; // To optionally bypass the whole definition above
google.protobuf.Any extensions = 6;
optional uint64 timestamp = 1; // Timestamp at message sending time
repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
optional uint64 seq = 3; // Sequence number
optional string uuid = 4; // UUID to track message type in terms of schema definitions
optional bytes body = 5; // To optionally bypass the whole definition above
google.protobuf.Any extensions = 6;
}