Merge pull request #8084 from thingsboard/attribures_tests
[3.4.5] sparkplug: rpc/attribute
This commit is contained in:
commit
7f46a821b9
@ -104,6 +104,7 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
|
||||
mqttDeviceProfileTransportConfiguration.setDeviceAttributesTopic(config.getAttributesTopicFilter());
|
||||
}
|
||||
mqttDeviceProfileTransportConfiguration.setSparkPlug(config.isSparkPlug());
|
||||
mqttDeviceProfileTransportConfiguration.setSparkPlugAttributesMetricNames(config.sparkPlugAttributesMetricNames);
|
||||
mqttDeviceProfileTransportConfiguration.setSendAckOnValidationException(config.isSendAckOnValidationException());
|
||||
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration;
|
||||
if (TransportPayloadType.JSON.equals(transportPayloadType)) {
|
||||
|
||||
@ -20,6 +20,8 @@ import lombok.Data;
|
||||
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
|
||||
import org.thingsboard.server.common.data.TransportPayloadType;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class MqttTestConfigProperties {
|
||||
@ -27,6 +29,7 @@ public class MqttTestConfigProperties {
|
||||
String deviceName;
|
||||
String gatewayName;
|
||||
boolean isSparkPlug;
|
||||
Set<String> sparkPlugAttributesMetricNames;
|
||||
|
||||
TransportPayloadType transportPayloadType;
|
||||
|
||||
|
||||
@ -28,6 +28,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
|
||||
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
|
||||
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
|
||||
import org.junit.Assert;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.TransportPayloadType;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
@ -48,8 +49,12 @@ import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
|
||||
import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes;
|
||||
@ -84,10 +89,19 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
|
||||
protected int seq = 0;
|
||||
protected static final long PUBLISH_TS_DELTA_MS = 86400000;// Publish start TS <-> 24h
|
||||
|
||||
// NBIRTH
|
||||
protected static final String keyNodeRebirth = "Node Control/Rebirth";
|
||||
|
||||
//*BIRTH
|
||||
protected static final MetricDataType metricBirthDataType_Int32 = Int32;
|
||||
protected static final String metricBirthName_Int32 = "Device Metric int32";
|
||||
protected Set<String> sparkPlugAttributesMetricNames;
|
||||
|
||||
public void beforeSparkplugTest() throws Exception {
|
||||
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
|
||||
.gatewayName("Test Connect Sparkplug client node")
|
||||
.isSparkPlug(true)
|
||||
.sparkPlugAttributesMetricNames(sparkPlugAttributesMetricNames)
|
||||
.transportPayloadType(TransportPayloadType.PROTOBUF)
|
||||
.build();
|
||||
processBeforeTest(configProperties);
|
||||
@ -123,6 +137,50 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
|
||||
MqttConnAck connAckMsg = (MqttConnAck) response;
|
||||
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode());
|
||||
}
|
||||
protected List<Device> connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices, long ts) throws Exception {
|
||||
List<Device> devices = new ArrayList<>();
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
MetricDataType metricDataType = Int32;
|
||||
String key = "Node Metric int32";
|
||||
int valueDeviceInt32 = 1024;
|
||||
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
|
||||
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
|
||||
.setTimestamp(ts)
|
||||
.setSeq(getBdSeqNum());
|
||||
payloadBirthNode.addMetrics(metric);
|
||||
payloadBirthNode.setTimestamp(ts);
|
||||
if (client.isConnected()) {
|
||||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
|
||||
payloadBirthNode.build().toByteArray(), 0, false);
|
||||
}
|
||||
|
||||
valueDeviceInt32 = 4024;
|
||||
metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32);
|
||||
for (int i = 0; i < cntDevices; i++) {
|
||||
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
|
||||
.setTimestamp(ts)
|
||||
.setSeq(getSeqNum());
|
||||
String deviceName = deviceId + "_" + i;
|
||||
|
||||
payloadBirthDevice.addMetrics(metric);
|
||||
if (client.isConnected()) {
|
||||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
|
||||
payloadBirthDevice.build().toByteArray(), 0, false);
|
||||
AtomicReference<Device> device = new AtomicReference<>();
|
||||
await(alias + "find device [" + deviceName + "] after created")
|
||||
.atMost(200, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
|
||||
return device.get() != null;
|
||||
});
|
||||
devices.add(device.get());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Assert.assertEquals(cntDevices, devices.size());
|
||||
return devices;
|
||||
}
|
||||
|
||||
protected long getBdSeqNum() throws Exception {
|
||||
if (bdSeq == 256) {
|
||||
@ -138,17 +196,16 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
|
||||
return seq++;
|
||||
}
|
||||
|
||||
|
||||
protected void connectionWithBirth(List<String> listKeys, MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception {
|
||||
protected List<String> connectionWithNBirth(MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception {
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
|
||||
.setTimestamp(calendar.getTimeInMillis());
|
||||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
|
||||
long valueBdSec = getBdSeqNum();
|
||||
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64));
|
||||
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq);
|
||||
String keyRebirth = "Node Control/Rebirth";
|
||||
payloadBirthNode.addMetrics(createMetric(false, ts, keyRebirth, MetricDataType.Boolean));
|
||||
listKeys.add(keyRebirth);
|
||||
payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean));
|
||||
listKeys.add(keyNodeRebirth);
|
||||
|
||||
payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType));
|
||||
listKeys.add(metricKey);
|
||||
@ -157,6 +214,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
|
||||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
|
||||
payloadBirthNode.build().toByteArray(), 0, false);
|
||||
}
|
||||
return listKeys;
|
||||
}
|
||||
|
||||
protected void createdAddMetricValuePrimitiveTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys,
|
||||
@ -369,11 +427,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage mqttMsg) throws Exception {
|
||||
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(mqttMsg.getPayload());
|
||||
System.out.println("Message Arrived on topic " + topic);
|
||||
for (SparkplugBProto.Payload.Metric metric : sparkplugBProtoNode.getMetricsList()) {
|
||||
System.out.println("Metric: " + metric.toString());
|
||||
messageArrivedMetrics.add(metric);
|
||||
}
|
||||
messageArrivedMetrics.addAll(sparkplugBProtoNode.getMetricsList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -15,22 +15,24 @@
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.sparkplug.attributes;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Assert;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
|
||||
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
|
||||
|
||||
/**
|
||||
* Created by nickAS21 on 12.01.23
|
||||
@ -38,26 +40,27 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp
|
||||
@Slf4j
|
||||
public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends AbstractMqttV5ClientSparkplugTest {
|
||||
|
||||
protected ThreadLocalRandom random = ThreadLocalRandom.current();
|
||||
/**
|
||||
* "sparkPlugAttributesMetricNames": ["SN node", "SN device", "Firmware version", "Date version", "Last date update"]
|
||||
* @throws Exception
|
||||
*/
|
||||
|
||||
protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32());
|
||||
List<String> listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
|
||||
// Shared attribute "Node Control/Rebirth" = true. type = NCMD.
|
||||
String key = "Node Control/Rebirth";
|
||||
boolean value = true;
|
||||
Assert.assertTrue(listKeys.contains(key));
|
||||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + key + "\":" + value + "}";
|
||||
Assert.assertTrue(listKeys.contains(keyNodeRebirth));
|
||||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}";
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
await(alias + SparkplugMessageType.NBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(key, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertEquals(keyNodeRebirth, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue());
|
||||
}
|
||||
|
||||
@ -70,13 +73,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
|
||||
*/
|
||||
protected void processClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
MetricDataType metricDataType = MetricDataType.Boolean;
|
||||
String metricKey = "MyBoolean";
|
||||
Object metricValue = nextBoolean();
|
||||
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
|
||||
connectionWithNBirth(metricDataType, metricKey, metricValue);
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
// Boolean <-> String
|
||||
boolean expectedValue = true;
|
||||
@ -134,13 +136,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
|
||||
|
||||
protected void processClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
MetricDataType metricDataType = UInt32;
|
||||
String metricKey = "MyLong";
|
||||
Object metricValue = nextUInt32();
|
||||
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
|
||||
connectionWithNBirth(metricDataType, metricKey, metricValue);
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
// Long <-> String
|
||||
String valueStr = "123";
|
||||
@ -186,13 +187,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
|
||||
|
||||
protected void processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
MetricDataType metricDataType = MetricDataType.Float;
|
||||
String metricKey = "MyFloat";
|
||||
Object metricValue = nextFloat(30, 400);
|
||||
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
|
||||
connectionWithNBirth(metricDataType, metricKey, metricValue);
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
// Float <-> String
|
||||
String valueStr = "123.345";
|
||||
@ -238,13 +238,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
|
||||
|
||||
protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
MetricDataType metricDataType = MetricDataType.Double;
|
||||
String metricKey = "MyDouble";
|
||||
Object metricValue = nextDouble();
|
||||
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
|
||||
connectionWithNBirth(metricDataType, metricKey, metricValue);
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
// Double <-> String
|
||||
String valueStr = "123345456";
|
||||
@ -290,13 +289,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
|
||||
|
||||
protected void processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
MetricDataType metricDataType = MetricDataType.String;
|
||||
String metricKey = "MyString";
|
||||
Object metricValue = nextString();
|
||||
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
|
||||
connectionWithNBirth(metricDataType, metricKey, metricValue);
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
// String <-> Long
|
||||
long valueLong = 123345456L;
|
||||
@ -340,4 +338,99 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
|
||||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue());
|
||||
}
|
||||
|
||||
protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
|
||||
|
||||
// Integer <-> Integer
|
||||
int expectedValueInt = 123456;
|
||||
|
||||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + expectedValueInt + "}";
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
await(alias + SparkplugMessageType.DBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
|
||||
}
|
||||
|
||||
protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
|
||||
|
||||
// Int <-> String
|
||||
String valueStr = "123";
|
||||
long expectedValue = Long.valueOf(valueStr);
|
||||
|
||||
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueStr + "}";
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
await(alias + SparkplugMessageType.DBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
|
||||
mqttCallback.deleteMessageArrivedMetrics(0);
|
||||
|
||||
// Int <-> Boolean
|
||||
Boolean valueBoolean = true;
|
||||
expectedValue = 1;
|
||||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}";
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
await(alias + SparkplugMessageType.NBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
|
||||
mqttCallback.deleteMessageArrivedMetrics(0);
|
||||
|
||||
valueBoolean = false;
|
||||
expectedValue = 0;
|
||||
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}";
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
await(alias + SparkplugMessageType.NBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
|
||||
}
|
||||
|
||||
protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
|
||||
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE;
|
||||
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
|
||||
await(alias + SparkplugMessageType.NBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() {
|
||||
}));
|
||||
return actualKeys.get().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0));
|
||||
}
|
||||
|
||||
protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
|
||||
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE;
|
||||
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
|
||||
await(alias + SparkplugMessageType.DBIRTH.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() {
|
||||
}));
|
||||
return actualKeys.get().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.sparkplug.attributes;
|
||||
|
||||
import org.eclipse.paho.mqttv5.common.MqttException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
||||
/**
|
||||
* Created by nickAS21 on 12.01.23
|
||||
*/
|
||||
@DaoSqlTest
|
||||
public class MqttV5ClientSparkplugBAttributesInProfileTest extends AbstractMqttV5ClientSparkplugAttributesTest {
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
sparkPlugAttributesMetricNames = new HashSet<>();
|
||||
sparkPlugAttributesMetricNames.add(metricBirthName_Int32);
|
||||
beforeSparkplugTest();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest () throws MqttException {
|
||||
if (client.isConnected()) {
|
||||
client.disconnect(); }
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
|
||||
processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
|
||||
processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes();
|
||||
}
|
||||
|
||||
}
|
||||
@ -68,4 +68,14 @@ public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSp
|
||||
processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception {
|
||||
processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
|
||||
processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk();
|
||||
}
|
||||
|
||||
}
|
||||
@ -22,31 +22,33 @@ import org.junit.Assert;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
|
||||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
|
||||
|
||||
/**
|
||||
* Created by nickAS21 on 12.01.23
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest {
|
||||
public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest {
|
||||
|
||||
protected void processClientWithCorrectNodeAccessTokenWithNDEATH_Test() throws Exception {
|
||||
long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS;
|
||||
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
|
||||
long value = bdSeq = 0;
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH(ts, value);
|
||||
|
||||
@ -69,38 +71,97 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr
|
||||
String expectedMessage = "Server unavailable.";
|
||||
int expectedReasonCode = 136;
|
||||
Assert.assertEquals(expectedMessage, actualException.getMessage());
|
||||
Assert.assertEquals( expectedReasonCode, actualException.getReasonCode());
|
||||
Assert.assertEquals(expectedReasonCode, actualException.getReasonCode());
|
||||
}
|
||||
|
||||
protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
long ts = calendar.getTimeInMillis();
|
||||
MetricDataType metricDataType = Int32;
|
||||
Set<String> deviceIds = new HashSet<>();
|
||||
String key = "Device Metric int32";
|
||||
int valueDeviceInt32 = 1024;
|
||||
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
|
||||
for (int i=0; i < cntDevices; i++ ) {
|
||||
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
|
||||
.setTimestamp(calendar.getTimeInMillis())
|
||||
.setSeq(getSeqNum());
|
||||
String deviceName = deviceId + "_" + i;
|
||||
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
|
||||
}
|
||||
|
||||
payloadBirthDevice.addMetrics(metric);
|
||||
if (client.isConnected()) {
|
||||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
|
||||
payloadBirthDevice.build().toByteArray(), 0, false);
|
||||
AtomicReference<Device> device = new AtomicReference<>();
|
||||
await(alias + "find device [" + deviceName + "] after created")
|
||||
.atMost(200, TimeUnit.SECONDS)
|
||||
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(int cntDevices) throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
|
||||
|
||||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name()));
|
||||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
|
||||
await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
|
||||
return finalFuture.get().get().contains(tsKvEntry);
|
||||
});
|
||||
|
||||
for (Device device : devices) {
|
||||
await(alias + messageName(STATE) + ", device: " + device.getName())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));
|
||||
return finalFuture.get().get().contains(tsKvEntry);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(int cntDevices, int indexDeviceDisconnect) throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
|
||||
|
||||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name()));
|
||||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
|
||||
|
||||
SparkplugBProto.Payload.Builder payloadDeathDevice = SparkplugBProto.Payload.newBuilder()
|
||||
.setTimestamp(ts)
|
||||
.setSeq(getSeqNum());
|
||||
if (client.isConnected()) {
|
||||
List<Device> devicesList = new ArrayList<>(devices);
|
||||
Device device = devicesList.get(indexDeviceDisconnect);
|
||||
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(),
|
||||
payloadDeathDevice.build().toByteArray(), 0, false);
|
||||
await(alias + messageName(STATE) + ", device: " + device.getName())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));
|
||||
return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(int cntDevices) throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
|
||||
|
||||
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name()));
|
||||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
|
||||
|
||||
if (client.isConnected()) {
|
||||
client.disconnect();
|
||||
|
||||
await(alias + messageName(STATE) + ", device: " + savedGateway.getName())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId()));
|
||||
return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry);
|
||||
});
|
||||
|
||||
List<Device> devicesList = new ArrayList<>(devices);
|
||||
for (Device device : devicesList) {
|
||||
await(alias + messageName(STATE) + ", device: " + device.getName())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
|
||||
return device.get() != null;
|
||||
finalFuture.set(tsService.findAllLatest(tenantId, device.getId()));
|
||||
return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry);
|
||||
});
|
||||
}
|
||||
deviceIds.add(deviceName);
|
||||
}
|
||||
Assert.assertEquals(cntDevices, deviceIds.size());
|
||||
}
|
||||
|
||||
private boolean findEqualsKeyValueInKvEntrys(List<TsKvEntry> finalFuture, TsKvEntry tsKvEntry) {
|
||||
for (TsKvEntry kvEntry : finalFuture) {
|
||||
if (kvEntry.getKey().equals(tsKvEntry.getKey()) && kvEntry.getValue().equals(tsKvEntry.getValue())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -60,4 +60,19 @@ public class MqttV5ClientSparkplugBConnectionTest extends AbstractMqttV5ClientSp
|
||||
processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL() throws Exception {
|
||||
processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE() throws Exception {
|
||||
processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(3, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All() throws Exception {
|
||||
processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(3);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,107 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.sparkplug.rpc;
|
||||
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest {
|
||||
|
||||
private static final int metricBirthValue_Int32 = 123456;
|
||||
private static final String sparkplugRpcRequest = "{\"metricName\":\"" + metricBirthName_Int32 + "\",\"value\":" + metricBirthValue_Int32 + "}";
|
||||
|
||||
@Test
|
||||
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}";
|
||||
String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway);
|
||||
await(alias + SparkplugMessageType.NCMD.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(expected, actual);
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
|
||||
long ts = calendar.getTimeInMillis();
|
||||
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
|
||||
String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}";
|
||||
String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0));
|
||||
await(alias + NCMD.name())
|
||||
.atMost(40, TimeUnit.SECONDS)
|
||||
.until(() -> {
|
||||
return mqttCallback.getMessageArrivedMetrics().size() == 1;
|
||||
});
|
||||
Assert.assertEquals(expected, actual);
|
||||
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
|
||||
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
String invalidateTypeMessageName = "RCMD";
|
||||
String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " +
|
||||
invalidateTypeMessageName + "{\\\"metricName\\\":\\\"" + metricBirthName_Int32 + "\\\",\\\"value\\\":" + metricBirthValue_Int32 + "}\"}";
|
||||
String actual = sendRPCSparkplug(invalidateTypeMessageName, sparkplugRpcRequest, savedGateway);
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
|
||||
String metricNameBad = metricBirthName_Int32 + "_Bad";
|
||||
String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}";
|
||||
String expected = "{\"result\":\"BAD_REQUEST_PARAMS\",\"error\":\"Failed send To Node Rpc Request: " +
|
||||
DCMD.name() + ". This node does not have a metricName: [" + metricNameBad + "]\"}";
|
||||
String actual = sendRPCSparkplug(DCMD.name(), sparkplugRpcRequestBad, savedGateway);
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
private String sendRPCSparkplug(String nameTypeMessage, String keyValue, Device device) throws Exception {
|
||||
String setRpcRequest = "{\"method\": \"" + nameTypeMessage + "\", \"params\": " + keyValue + "}";
|
||||
return doPostAsync("/api/plugins/rpc/twoway/" + device.getId().getId(), setRpcRequest, String.class, status().isOk());
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,61 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.sparkplug.rpc;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.mqttv5.common.MqttException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
|
||||
@DaoSqlTest
|
||||
@Slf4j
|
||||
public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest {
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
beforeSparkplugTest();
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() throws MqttException {
|
||||
if (client.isConnected()) {
|
||||
client.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
|
||||
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
|
||||
processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception {
|
||||
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception {
|
||||
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS();
|
||||
}
|
||||
|
||||
}
|
||||
@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
|
||||
|
||||
/**
|
||||
* Created by nickAS21 on 12.01.23
|
||||
@ -39,8 +38,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
|
||||
|
||||
protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception {
|
||||
clientWithCorrectNodeAccessTokenWithNDEATH();
|
||||
List<String> listKeys = new ArrayList<>();
|
||||
connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32());
|
||||
List<String> listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
|
||||
Assert.assertTrue("Connection node is failed", client.isConnected());
|
||||
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
|
||||
await(alias + SparkplugMessageType.NBIRTH.name())
|
||||
|
||||
@ -43,6 +43,8 @@ import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.leshan.core.ResponseCode;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
@ -82,6 +84,8 @@ import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler
|
||||
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
|
||||
import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcResponseBody;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
|
||||
|
||||
import javax.net.ssl.SSLPeerUnverifiedException;
|
||||
@ -110,8 +114,11 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
|
||||
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish;
|
||||
|
||||
/**
|
||||
@ -394,12 +401,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
// TODO
|
||||
break;
|
||||
case NBIRTH:
|
||||
sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList());
|
||||
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
|
||||
break;
|
||||
case NCMD:
|
||||
case NDATA:
|
||||
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
|
||||
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
|
||||
break;
|
||||
case NRECORD:
|
||||
// TODO
|
||||
@ -416,7 +420,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
case DBIRTH:
|
||||
case DCMD:
|
||||
case DDATA:
|
||||
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
|
||||
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
|
||||
break;
|
||||
/**
|
||||
* TODO
|
||||
@ -794,12 +798,21 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
registerSubQoS(topic, grantedQoSList, reqQoS);
|
||||
}
|
||||
|
||||
public void processAttributesSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
|
||||
private void processAttributesSubscribe(List<Integer> grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
|
||||
attrSubTopicType = topicType;
|
||||
registerSubQoS(topic, grantedQoSList, reqQoS);
|
||||
}
|
||||
|
||||
public void processAttributesRpcSubscribeSparkplugNode(List<Integer> grantedQoSList, MqttQoS reqQoS) {
|
||||
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
|
||||
.setSessionInfo(deviceSessionCtx.getSessionInfo())
|
||||
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
|
||||
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
|
||||
.build(), null);
|
||||
registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, reqQoS);
|
||||
}
|
||||
|
||||
public void registerSubQoS(String topic, List<Integer> grantedQoSList, MqttQoS reqQoS) {
|
||||
grantedQoSList.add(getMinSupportedQos(reqQoS));
|
||||
mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS));
|
||||
@ -1088,7 +1101,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
if (sparkplugTopicNode != null) {
|
||||
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
|
||||
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode);
|
||||
sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode,
|
||||
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode,
|
||||
deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode);
|
||||
} else {
|
||||
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName());
|
||||
@ -1129,8 +1142,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
gatewaySessionHandler.onDevicesDisconnect();
|
||||
}
|
||||
if (sparkplugSessionHandler != null) {
|
||||
// add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp()
|
||||
sparkplugSessionHandler.stateSparkplugtSendOnTelemetry(deviceSessionCtx.getSessionInfo(),
|
||||
// add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp()
|
||||
sparkplugSessionHandler.sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(),
|
||||
deviceSessionCtx.getDeviceInfo().getDeviceName(), OFFLINE, new Date().getTime());
|
||||
sparkplugSessionHandler.onDevicesDisconnect();
|
||||
}
|
||||
@ -1139,7 +1152,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
deviceSessionCtx.release();
|
||||
}
|
||||
|
||||
|
||||
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
|
||||
if (!msg.hasDeviceInfo()) {
|
||||
context.onAuthFailure(address);
|
||||
@ -1206,8 +1218,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
@Override
|
||||
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
|
||||
log.trace("[{}] Received attributes update notification to device", sessionId);
|
||||
String topic = attrSubTopicType.getAttributesSubTopic();
|
||||
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType);
|
||||
try {
|
||||
if (sparkplugSessionHandler != null) {
|
||||
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
|
||||
@ -1222,6 +1232,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
}
|
||||
});
|
||||
} else {
|
||||
String topic = attrSubTopicType.getAttributesSubTopic();
|
||||
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType);
|
||||
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -1239,39 +1251,77 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
|
||||
log.trace("[{}] Received RPC command to device", sessionId);
|
||||
String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
|
||||
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
|
||||
try {
|
||||
adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> {
|
||||
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
|
||||
if (isAckExpected(payload)) {
|
||||
rpcAwaitingAck.put(msgId, rpcRequest);
|
||||
context.getScheduler().schedule(() -> {
|
||||
TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
|
||||
if (msg != null) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
|
||||
if (sparkplugSessionHandler != null) {
|
||||
/**
|
||||
* NCMD {"metricName":"MyNodeMetric05_String","value":"MyNodeMetric05_String_Value"}
|
||||
* NCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444}
|
||||
* NCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333}
|
||||
* NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222}
|
||||
* NCMD {"metricName":"Node Control/Rebirth","value":false}
|
||||
* NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]}
|
||||
*/
|
||||
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
|
||||
if (messageType == null) {
|
||||
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams());
|
||||
return;
|
||||
}
|
||||
var cf = publish(payload, deviceSessionCtx);
|
||||
cf.addListener(result -> {
|
||||
if (result.cause() == null) {
|
||||
if (!isAckExpected(payload)) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
} else if (rpcRequest.getPersisted()) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
} else {
|
||||
// TODO: send error
|
||||
}
|
||||
});
|
||||
});
|
||||
SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
|
||||
header.setMessageType(messageType.name());
|
||||
TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
|
||||
if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
|
||||
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
|
||||
messageType);
|
||||
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
|
||||
sparkplugTopic.toString(),
|
||||
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
|
||||
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
|
||||
} else {
|
||||
sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.BAD_REQUEST_PARAMS, "Failed send To Node Rpc Request: " +
|
||||
rpcRequest.getMethodName() + ". This node does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
|
||||
}
|
||||
} else {
|
||||
String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
|
||||
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
|
||||
adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic)
|
||||
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(),
|
||||
TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
|
||||
.setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
|
||||
log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
|
||||
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.INVALID_ARGUMENTS,
|
||||
"Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams());
|
||||
}
|
||||
}
|
||||
|
||||
public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
|
||||
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
|
||||
if (isAckExpected(payload)) {
|
||||
rpcAwaitingAck.put(msgId, rpcRequest);
|
||||
context.getScheduler().schedule(() -> {
|
||||
TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
|
||||
if (msg != null) {
|
||||
transportService.process(sessionInfo, rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
var cf = publish(payload, deviceSessionCtx);
|
||||
cf.addListener(result -> {
|
||||
if (result.cause() == null) {
|
||||
if (!isAckExpected(payload)) {
|
||||
transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
} else if (rpcRequest.getPersisted()) {
|
||||
transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName());
|
||||
} else {
|
||||
log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName());
|
||||
this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1311,4 +1361,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {
|
||||
String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.name()).error(errorMsg).build());
|
||||
TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build();
|
||||
transportService.process(sessionInfo, msg, null);
|
||||
}
|
||||
|
||||
public void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) {
|
||||
String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.getName()).result(successMsg).build());
|
||||
TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build();
|
||||
transportService.process(sessionInfo, msg, null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
@ -75,8 +75,9 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG;
|
||||
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.17.
|
||||
@ -246,45 +247,45 @@ public abstract class AbstractGatewaySessionHandler {
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
try {
|
||||
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
|
||||
.setDeviceName(deviceName)
|
||||
.setDeviceType(deviceType)
|
||||
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
|
||||
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits())
|
||||
.setSparkplug(this.deviceSessionCtx.isSparkplug())
|
||||
.build(),
|
||||
new TransportServiceCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
|
||||
AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg);
|
||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
|
||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
||||
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
|
||||
.setSessionInfo(deviceSessionInfo)
|
||||
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
|
||||
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
|
||||
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
|
||||
.build(), null);
|
||||
}
|
||||
futureToSet.set(devices.get(deviceName));
|
||||
deviceFutures.remove(deviceName);
|
||||
try {
|
||||
transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder()
|
||||
.setDeviceName(deviceName)
|
||||
.setDeviceType(deviceType)
|
||||
.setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits())
|
||||
.setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits())
|
||||
.setSparkplug(this.deviceSessionCtx.isSparkplug())
|
||||
.build(),
|
||||
new TransportServiceCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
|
||||
AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg);
|
||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
|
||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
||||
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
|
||||
.setSessionInfo(deviceSessionInfo)
|
||||
.setSessionEvent(SESSION_EVENT_MSG_OPEN)
|
||||
.setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG)
|
||||
.setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG)
|
||||
.build(), null);
|
||||
}
|
||||
futureToSet.set(devices.get(deviceName));
|
||||
deviceFutures.remove(deviceName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable e) {
|
||||
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
|
||||
futureToSet.setException(e);
|
||||
deviceFutures.remove(deviceName);
|
||||
}
|
||||
});
|
||||
return futureToSet;
|
||||
} catch (Throwable e) {
|
||||
deviceFutures.remove(deviceName);
|
||||
throw e;
|
||||
}
|
||||
@Override
|
||||
public void onError(Throwable e) {
|
||||
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
|
||||
futureToSet.setException(e);
|
||||
deviceFutures.remove(deviceName);
|
||||
}
|
||||
});
|
||||
return futureToSet;
|
||||
} catch (Throwable e) {
|
||||
deviceFutures.remove(deviceName);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg);
|
||||
@ -557,7 +558,7 @@ public abstract class AbstractGatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
|
||||
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
|
||||
}
|
||||
|
||||
@ -727,23 +728,23 @@ public abstract class AbstractGatewaySessionHandler {
|
||||
}
|
||||
|
||||
private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) {
|
||||
if (this.deviceSessionCtx.isSparkplug()){
|
||||
if (this.deviceSessionCtx.isSparkplug()) {
|
||||
// add Msg Telemetry: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp()
|
||||
stateSparkplugtSendOnTelemetry (deviceSessionCtx.getSessionInfo(),
|
||||
sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(),
|
||||
deviceSessionCtx.getDeviceInfo().getDeviceName(), OFFLINE, new Date().getTime());
|
||||
}
|
||||
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
|
||||
System.out.println("Removed device " + deviceName + " from the gateway session");
|
||||
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
|
||||
}
|
||||
|
||||
public void stateSparkplugtSendOnTelemetry (TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugMessageTypeSate typeSate, long ts) {
|
||||
public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState connectionState, long ts) {
|
||||
TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder();
|
||||
keyValueProtoBuilder.setKey(STATE.name());
|
||||
keyValueProtoBuilder.setKey(messageName(STATE));
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);
|
||||
keyValueProtoBuilder.setStringV(typeSate.name());
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts);
|
||||
keyValueProtoBuilder.setStringV(connectionState.name());
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts);
|
||||
|
||||
transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(channel, deviceName, -1, postTelemetryMsg));
|
||||
}
|
||||
|
||||
|
||||
@ -16,16 +16,23 @@
|
||||
package org.thingsboard.server.transport.mqtt.session;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto;
|
||||
|
||||
@Slf4j
|
||||
public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext<SparkplugNodeSessionHandler> {
|
||||
|
||||
@ -53,4 +60,44 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
|
||||
log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId);
|
||||
try {
|
||||
/**
|
||||
* DCMD {"metricName":"MyDeviceMetricText","value":"MyNodeMetric05_String_Value"}
|
||||
* DCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444}
|
||||
* DCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333}
|
||||
* DCMD {"metricName":"MyNodeMetric04_Float","value":413.18222}
|
||||
* DCMD {"metricName":"Node Control/Rebirth","value":false}
|
||||
* DCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]}
|
||||
*/
|
||||
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
|
||||
if (messageType == null) {
|
||||
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams());
|
||||
return;
|
||||
}
|
||||
SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
|
||||
header.setMessageType(messageType.name());
|
||||
TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
|
||||
if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
|
||||
SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(),
|
||||
messageType, deviceInfo.getDeviceName());
|
||||
parent.createSparkplugMqttPublishMsg(tsKvProto,
|
||||
sparkplugTopic.toString(),
|
||||
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
|
||||
.ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo));
|
||||
} else {
|
||||
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " +
|
||||
rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
|
||||
}
|
||||
} catch (ThingsboardException e) {
|
||||
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
|
||||
ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " +
|
||||
rpcRequest.getMethodName() + ". " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -21,21 +21,24 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.gson.JsonParser;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttTopics;
|
||||
import org.eclipse.leshan.core.ResponseCode;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
|
||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||
import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
|
||||
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
|
||||
import org.thingsboard.server.gen.transport.TransportApiProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
|
||||
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
|
||||
import org.thingsboard.server.transport.mqtt.TopicType;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
|
||||
|
||||
@ -44,6 +47,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
@ -51,7 +55,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.ONLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto;
|
||||
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric;
|
||||
@ -96,37 +100,47 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
public void onTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
|
||||
public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
|
||||
checkDeviceName(deviceName);
|
||||
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = topic.isNode() ?
|
||||
Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName);
|
||||
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
|
||||
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
|
||||
try {
|
||||
try {
|
||||
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
|
||||
// add Msg Telemetry: key STATE type: String value: ONLINE ts: sparkplugBProto.getTimestamp()
|
||||
stateSparkplugtSendOnTelemetry(contextListenableFuture.get().getSessionInfo(), deviceName, ONLINE,
|
||||
sendSparkplugStateOnTelemetry(contextListenableFuture.get().getSessionInfo(), deviceName, ONLINE,
|
||||
sparkplugBProto.getTimestamp());
|
||||
contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("Failed add Metrics. MessageType *BIRTH.", e);
|
||||
}
|
||||
if (topic.isType(NBIRTH)) {
|
||||
setNodeBirthMetrics(sparkplugBProto.getMetricsList());
|
||||
} else if (topic.isType(DBIRTH)) {
|
||||
contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("Failed add Metrics or change SparkplugConnectionState. MessageType *BIRTH.", e);
|
||||
}
|
||||
onDeviceTelemetryProto(contextListenableFuture, msgId, msgs, deviceName);
|
||||
Set<String> attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx
|
||||
.getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames();
|
||||
if (attributesMetricNames != null) {
|
||||
List<TransportApiProtos.AttributesMsg> attributesMsgList = convertToPostAttributes(sparkplugBProto, attributesMetricNames, deviceName);
|
||||
onDeviceAttributesProto(contextListenableFuture, msgId, attributesMsgList, deviceName);
|
||||
}
|
||||
List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList = convertToPostTelemetry(sparkplugBProto, attributesMetricNames, topic.getType().name());
|
||||
onDeviceTelemetryProto(contextListenableFuture, msgId, postTelemetryMsgList, deviceName);
|
||||
}
|
||||
|
||||
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
|
||||
int msgId, List<TransportProtos.PostTelemetryMsg> msgs, String deviceName) throws AdaptorException {
|
||||
int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) throws AdaptorException {
|
||||
try {
|
||||
int finalMsgId = msgId;
|
||||
for (TransportProtos.PostTelemetryMsg msg : msgs) {
|
||||
postTelemetryMsgList.forEach(telemetryMsg -> {
|
||||
Futures.addCallback(contextListenableFuture,
|
||||
new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
|
||||
try {
|
||||
processPostTelemetryMsg(deviceCtx, msg, deviceName, finalMsgId);
|
||||
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, finalMsgId);
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, msg, e);
|
||||
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e);
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
@ -136,6 +150,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
|
||||
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
|
||||
}
|
||||
}, context.getExecutor());
|
||||
});
|
||||
} catch (RuntimeException e) {
|
||||
throw new AdaptorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId,
|
||||
List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException {
|
||||
try {
|
||||
if (!CollectionUtils.isEmpty(attributesMsgList)) {
|
||||
attributesMsgList.forEach(attributesMsg -> {
|
||||
Futures.addCallback(contextListenableFuture,
|
||||
new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
|
||||
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
|
||||
try {
|
||||
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray());
|
||||
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
|
||||
}
|
||||
}, context.getExecutor());
|
||||
});
|
||||
} else {
|
||||
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
throw new AdaptorException(e);
|
||||
@ -152,7 +198,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
|
||||
// TODO SUBSCRIBE GroupId
|
||||
} else if (sparkplugTopic.isNode()) {
|
||||
// SUBSCRIBE Node
|
||||
parent.processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1);
|
||||
parent.processAttributesRpcSubscribeSparkplugNode(grantedQoSList, reqQoS);
|
||||
} else {
|
||||
// SUBSCRIBE Device - DO NOTHING, WE HAVE ALREADY SUBSCRIBED.
|
||||
// TODO: track that node subscribed to # or to particular device.
|
||||
@ -177,16 +223,18 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, String topicTypeName) throws AdaptorException {
|
||||
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set<String> attributesMetricNames, String topicTypeName) throws AdaptorException {
|
||||
try {
|
||||
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>();
|
||||
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
|
||||
long ts = protoMetric.getTimestamp();
|
||||
String key = "bdSeq".equals(protoMetric.getName()) ?
|
||||
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
|
||||
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
|
||||
if (keyValueProtoOpt.isPresent()) {
|
||||
msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts));
|
||||
if (attributesMetricNames == null || !attributesMetricNames.contains(protoMetric.getName())) {
|
||||
long ts = protoMetric.getTimestamp();
|
||||
String key = "bdSeq".equals(protoMetric.getName()) ?
|
||||
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
|
||||
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
|
||||
if (keyValueProtoOpt.isPresent()) {
|
||||
msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -204,6 +252,39 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private List<TransportApiProtos.AttributesMsg> convertToPostAttributes(SparkplugBProto.Payload sparkplugBProto,
|
||||
Set<String> attributesMetricNames,
|
||||
String deviceName) throws AdaptorException {
|
||||
try {
|
||||
List<TransportApiProtos.AttributesMsg> msgs = new ArrayList<>();
|
||||
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
|
||||
if (attributesMetricNames.contains(protoMetric.getName())) {
|
||||
TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder();
|
||||
Optional<TransportProtos.PostAttributeMsg> msgOpt = getPostAttributeMsg(protoMetric);
|
||||
if (msgOpt.isPresent()) {
|
||||
deviceAttributesMsgBuilder.setDeviceName(deviceName);
|
||||
deviceAttributesMsgBuilder.setMsg(msgOpt.get());
|
||||
msgs.add(deviceAttributesMsgBuilder.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
return msgs;
|
||||
} catch (IllegalStateException | JsonSyntaxException | ThingsboardException e) {
|
||||
log.error("Failed to decode post telemetry request", e);
|
||||
throw new AdaptorException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<TransportProtos.PostAttributeMsg> getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
|
||||
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric);
|
||||
if (keyValueProtoOpt.isPresent()) {
|
||||
TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder();
|
||||
builder.addKv(keyValueProtoOpt.get());
|
||||
return Optional.of(builder.build());
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public SparkplugTopic getSparkplugTopicNode() {
|
||||
return this.sparkplugTopicNode;
|
||||
}
|
||||
@ -238,4 +319,17 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
|
||||
return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
|
||||
}
|
||||
|
||||
protected void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
|
||||
parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo);
|
||||
}
|
||||
|
||||
protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {
|
||||
parent.sendErrorRpcResponse(sessionInfo, requestId, result, errorMsg);
|
||||
}
|
||||
|
||||
protected void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) {
|
||||
parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.util.sparkplug;
|
||||
|
||||
public enum SparkplugMessageTypeSate {
|
||||
public enum SparkplugConnectionState {
|
||||
/**
|
||||
* The EoN node should examine the payload of this
|
||||
* message to ensure that it is a value of “ONLINE”
|
||||
@ -87,6 +87,9 @@ public enum SparkplugMessageType {
|
||||
}
|
||||
throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS);
|
||||
}
|
||||
public static String messageName(SparkplugMessageType type) {
|
||||
return STATE.equals(type) ? "sparkplugConnectionState" : type.name();
|
||||
}
|
||||
|
||||
public boolean isDeath() {
|
||||
return this.equals(DDEATH) || this.equals(NDEATH);
|
||||
|
||||
@ -189,6 +189,41 @@ public class SparkplugMetricUtil {
|
||||
return metric;
|
||||
}
|
||||
|
||||
public static TransportProtos.TsKvProto getTsKvProto(String key, Object value, long ts) throws ThingsboardException {
|
||||
try {
|
||||
TransportProtos.TsKvProto.Builder tsKvProtoBuilder = TransportProtos.TsKvProto.newBuilder();
|
||||
TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder();
|
||||
keyValueProtoBuilder.setKey(key);
|
||||
if (value instanceof String) {
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V);
|
||||
keyValueProtoBuilder.setStringV((String) value);
|
||||
} else if (value instanceof Integer) {
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.LONG_V);
|
||||
keyValueProtoBuilder.setLongV((Integer) value);
|
||||
} else if (value instanceof Long) {
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.LONG_V);
|
||||
keyValueProtoBuilder.setLongV((Long) value);
|
||||
} else if (value instanceof Boolean) {
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.BOOLEAN_V);
|
||||
keyValueProtoBuilder.setBoolV((Boolean) value);
|
||||
} else if (value instanceof Double) {
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.DOUBLE_V);
|
||||
keyValueProtoBuilder.setDoubleV((Double) value);
|
||||
} else if (value instanceof List) {
|
||||
keyValueProtoBuilder.setType(TransportProtos.KeyValueType.JSON_V);
|
||||
ArrayNode arrayNodeBytes = JacksonUtil.convertValue(value, ArrayNode.class);
|
||||
keyValueProtoBuilder.setJsonV(arrayNodeBytes.toString());
|
||||
} else {
|
||||
throw new ThingsboardException("Failed to convert device/node RPC command to TsKvProto for Sparkplug MQT msg: value [" + value + "]", ThingsboardErrorCode.INVALID_ARGUMENTS);
|
||||
}
|
||||
tsKvProtoBuilder.setKv(keyValueProtoBuilder.build());
|
||||
tsKvProtoBuilder.setTs(ts);
|
||||
return tsKvProtoBuilder.build();
|
||||
} catch (Exception e) {
|
||||
throw new ThingsboardException("Failed to convert device/node RPC command to TsKvProto for Sparkplug MQT msg: value [" + value + "]", ThingsboardErrorCode.INVALID_ARGUMENTS);
|
||||
}
|
||||
}
|
||||
|
||||
public static Optional<Object> validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) throws ThingsboardException {
|
||||
if (kv.getTypeValue() <= 3) {
|
||||
return validatedValuePrimitiveByTypeMetric(kv, metricDataType);
|
||||
@ -214,8 +249,8 @@ public class SparkplugMetricUtil {
|
||||
case UInt8:
|
||||
case UInt16:
|
||||
case Int32:
|
||||
Optional <Integer> boolInt8 = booleanStringToInt (valueOpt.get());
|
||||
if(boolInt8.isPresent()) {
|
||||
Optional<Integer> boolInt8 = booleanStringToInt(valueOpt.get());
|
||||
if (boolInt8.isPresent()) {
|
||||
return Optional.of(boolInt8.get());
|
||||
}
|
||||
try {
|
||||
@ -233,25 +268,25 @@ public class SparkplugMetricUtil {
|
||||
case Int64:
|
||||
case UInt64:
|
||||
case DateTime:
|
||||
Optional <Integer> boolInt64 = booleanStringToInt (valueOpt.get());
|
||||
if(boolInt64.isPresent()) {
|
||||
Optional<Integer> boolInt64 = booleanStringToInt(valueOpt.get());
|
||||
if (boolInt64.isPresent()) {
|
||||
return Optional.of(Long.valueOf(boolInt64.get()));
|
||||
}
|
||||
var l = new BigDecimal(valueOpt.get());
|
||||
return Optional.of(l.longValue());
|
||||
// float
|
||||
// float
|
||||
case Float:
|
||||
Optional <Integer> boolFloat = booleanStringToInt (valueOpt.get());
|
||||
if(boolFloat.isPresent()) {
|
||||
Optional<Integer> boolFloat = booleanStringToInt(valueOpt.get());
|
||||
if (boolFloat.isPresent()) {
|
||||
var fb = new BigDecimal(boolFloat.get());
|
||||
return Optional.of(fb.floatValue());
|
||||
}
|
||||
var f = new BigDecimal(valueOpt.get());
|
||||
return Optional.of(f.floatValue());
|
||||
// double
|
||||
// double
|
||||
case Double:
|
||||
Optional <Integer> boolDouble = booleanStringToInt (valueOpt.get());
|
||||
if(boolDouble.isPresent()) {
|
||||
Optional<Integer> boolDouble = booleanStringToInt(valueOpt.get());
|
||||
if (boolDouble.isPresent()) {
|
||||
return Optional.of(Double.valueOf(boolDouble.get()));
|
||||
}
|
||||
var dd = new BigDecimal(valueOpt.get());
|
||||
@ -272,7 +307,7 @@ public class SparkplugMetricUtil {
|
||||
case String:
|
||||
case Text:
|
||||
case UUID:
|
||||
return Optional.of(valueOpt.get());
|
||||
return Optional.of(valueOpt.get());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.trace("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
|
||||
@ -282,15 +317,16 @@ public class SparkplugMetricUtil {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static Optional<Object> validatedValueJsonByTypeMetric(String arrayNodeStr, MetricDataType metricDataType) {
|
||||
public static Optional<Object> validatedValueJsonByTypeMetric(String arrayNodeStr, MetricDataType metricDataType) {
|
||||
try {
|
||||
Optional<Object> valueOpt;
|
||||
switch (metricDataType) {
|
||||
// byte[]
|
||||
case Bytes:
|
||||
List<Byte> listBytes = JacksonUtil.fromString(arrayNodeStr, new TypeReference<>() {});
|
||||
List<Byte> listBytes = JacksonUtil.fromString(arrayNodeStr, new TypeReference<>() {
|
||||
});
|
||||
byte[] bytes = new byte[listBytes.size()];
|
||||
for(int i = 0; i < listBytes.size(); i++) {
|
||||
for (int i = 0; i < listBytes.size(); i++) {
|
||||
bytes[i] = listBytes.get(i).byteValue();
|
||||
}
|
||||
return Optional.of(bytes);
|
||||
@ -324,7 +360,7 @@ public class SparkplugMetricUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static Optional<Integer> booleanStringToInt (String booleanStr) {
|
||||
private static Optional<Integer> booleanStringToInt(String booleanStr) {
|
||||
if ("true".equals(booleanStr)) {
|
||||
return Optional.of(1);
|
||||
} else if ("false".equals(booleanStr)) {
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.util.sparkplug;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class SparkplugRpcRequestHeader {
|
||||
|
||||
private String messageType;
|
||||
private String metricName;
|
||||
private Object value;
|
||||
|
||||
}
|
||||
@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.util.sparkplug;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class SparkplugRpcResponseBody {
|
||||
|
||||
private String result;
|
||||
private String value;
|
||||
private String error;
|
||||
|
||||
}
|
||||
@ -46,7 +46,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
|
||||
protected volatile DeviceProfile deviceProfile;
|
||||
@Getter
|
||||
@Setter
|
||||
private volatile TransportProtos.SessionInfoProto sessionInfo;
|
||||
protected volatile TransportProtos.SessionInfoProto sessionInfo;
|
||||
|
||||
@Setter
|
||||
private volatile boolean connected;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user