diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java index 28e458d971..05ebedd6e8 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java @@ -104,6 +104,7 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg mqttDeviceProfileTransportConfiguration.setDeviceAttributesTopic(config.getAttributesTopicFilter()); } mqttDeviceProfileTransportConfiguration.setSparkPlug(config.isSparkPlug()); + mqttDeviceProfileTransportConfiguration.setSparkPlugAttributesMetricNames(config.sparkPlugAttributesMetricNames); mqttDeviceProfileTransportConfiguration.setSendAckOnValidationException(config.isSendAckOnValidationException()); TransportPayloadTypeConfiguration transportPayloadTypeConfiguration; if (TransportPayloadType.JSON.equals(transportPayloadType)) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java index 1a2b6eefa4..599b18ff17 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java @@ -20,6 +20,8 @@ import lombok.Data; import org.thingsboard.server.common.data.DeviceProfileProvisionType; import org.thingsboard.server.common.data.TransportPayloadType; +import java.util.Set; + @Data @Builder public class MqttTestConfigProperties { @@ -27,6 +29,7 @@ public class MqttTestConfigProperties { String deviceName; String gatewayName; boolean isSparkPlug; + Set sparkPlugAttributesMetricNames; TransportPayloadType transportPayloadType; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index e949888733..00bf9c7c3a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -28,6 +28,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; import org.junit.Assert; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -48,8 +49,12 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static org.awaitility.Awaitility.await; import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; import static org.thingsboard.common.util.JacksonUtil.newArrayNode; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; @@ -84,10 +89,19 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte protected int seq = 0; protected static final long PUBLISH_TS_DELTA_MS = 86400000;// Publish start TS <-> 24h + // NBIRTH + protected static final String keyNodeRebirth = "Node Control/Rebirth"; + + //*BIRTH + protected static final MetricDataType metricBirthDataType_Int32 = Int32; + protected static final String metricBirthName_Int32 = "Device Metric int32"; + protected Set sparkPlugAttributesMetricNames; + public void beforeSparkplugTest() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .gatewayName("Test Connect Sparkplug client node") .isSparkPlug(true) + .sparkPlugAttributesMetricNames(sparkPlugAttributesMetricNames) .transportPayloadType(TransportPayloadType.PROTOBUF) .build(); processBeforeTest(configProperties); @@ -123,6 +137,50 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte MqttConnAck connAckMsg = (MqttConnAck) response; Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode()); } + protected List connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices, long ts) throws Exception { + List devices = new ArrayList<>(); + clientWithCorrectNodeAccessTokenWithNDEATH(); + MetricDataType metricDataType = Int32; + String key = "Node Metric int32"; + int valueDeviceInt32 = 1024; + SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); + SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getBdSeqNum()); + payloadBirthNode.addMetrics(metric); + payloadBirthNode.setTimestamp(ts); + if (client.isConnected()) { + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, + payloadBirthNode.build().toByteArray(), 0, false); + } + + valueDeviceInt32 = 4024; + metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32); + for (int i = 0; i < cntDevices; i++) { + SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getSeqNum()); + String deviceName = deviceId + "_" + i; + + payloadBirthDevice.addMetrics(metric); + if (client.isConnected()) { + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, + payloadBirthDevice.build().toByteArray(), 0, false); + AtomicReference device = new AtomicReference<>(); + await(alias + "find device [" + deviceName + "] after created") + .atMost(200, TimeUnit.SECONDS) + .until(() -> { + device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); + return device.get() != null; + }); + devices.add(device.get()); + } + + } + + Assert.assertEquals(cntDevices, devices.size()); + return devices; + } protected long getBdSeqNum() throws Exception { if (bdSeq == 256) { @@ -138,17 +196,16 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return seq++; } - - protected void connectionWithBirth(List listKeys, MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception { + protected List connectionWithNBirth(MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception { + List listKeys = new ArrayList<>(); SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()); long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long valueBdSec = getBdSeqNum(); payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64)); listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); - String keyRebirth = "Node Control/Rebirth"; - payloadBirthNode.addMetrics(createMetric(false, ts, keyRebirth, MetricDataType.Boolean)); - listKeys.add(keyRebirth); + payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean)); + listKeys.add(keyNodeRebirth); payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType)); listKeys.add(metricKey); @@ -157,6 +214,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } + return listKeys; } protected void createdAddMetricValuePrimitiveTsKv(List listTsKvEntry, List listKeys, @@ -369,11 +427,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte @Override public void messageArrived(String topic, MqttMessage mqttMsg) throws Exception { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(mqttMsg.getPayload()); - System.out.println("Message Arrived on topic " + topic); - for (SparkplugBProto.Payload.Metric metric : sparkplugBProtoNode.getMetricsList()) { - System.out.println("Metric: " + metric.toString()); - messageArrivedMetrics.add(metric); - } + messageArrivedMetrics.addAll(sparkplugBProtoNode.getMetricsList()); } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 57b14471cc..8cfd4f4547 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -15,22 +15,24 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug.attributes; +import com.fasterxml.jackson.core.type.TypeReference; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; +import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; /** * Created by nickAS21 on 12.01.23 @@ -38,26 +40,27 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp @Slf4j public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends AbstractMqttV5ClientSparkplugTest { - protected ThreadLocalRandom random = ThreadLocalRandom.current(); + /** + * "sparkPlugAttributesMetricNames": ["SN node", "SN device", "Firmware version", "Date version", "Last date update"] + * @throws Exception + */ protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); - connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32()); + List listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); // Shared attribute "Node Control/Rebirth" = true. type = NCMD. - String key = "Node Control/Rebirth"; boolean value = true; - Assert.assertTrue(listKeys.contains(key)); - String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + key + "\":" + value + "}"; + Assert.assertTrue(listKeys.contains(keyNodeRebirth)); + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}"; Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); await(alias + SparkplugMessageType.NBIRTH.name()) .atMost(40, TimeUnit.SECONDS) .until(() -> { return mqttCallback.getMessageArrivedMetrics().size() == 1; }); - Assert.assertEquals(key, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(keyNodeRebirth, mqttCallback.getMessageArrivedMetrics().get(0).getName()); Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); } @@ -70,13 +73,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra */ protected void processClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.Boolean; String metricKey = "MyBoolean"; Object metricValue = nextBoolean(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Boolean <-> String boolean expectedValue = true; @@ -134,13 +136,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = UInt32; String metricKey = "MyLong"; Object metricValue = nextUInt32(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Long <-> String String valueStr = "123"; @@ -186,13 +187,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.Float; String metricKey = "MyFloat"; Object metricValue = nextFloat(30, 400); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Float <-> String String valueStr = "123.345"; @@ -238,13 +238,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.Double; String metricKey = "MyDouble"; Object metricValue = nextDouble(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Double <-> String String valueStr = "123345456"; @@ -290,13 +289,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.String; String metricKey = "MyString"; Object metricValue = nextString(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // String <-> Long long valueLong = 123345456L; @@ -340,4 +338,99 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue()); } + protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + + // Integer <-> Integer + int expectedValueInt = 123456; + + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + expectedValueInt + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.DBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + + // Int <-> String + String valueStr = "123"; + long expectedValue = Long.valueOf(valueStr); + + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueStr + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.DBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + // Int <-> Boolean + Boolean valueBoolean = true; + expectedValue = 1; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + valueBoolean = false; + expectedValue = 0; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + AtomicReference> actualKeys = new AtomicReference<>(); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { + })); + return actualKeys.get().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0)); + } + + protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + AtomicReference> actualKeys = new AtomicReference<>(); + await(alias + SparkplugMessageType.DBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { + })); + return actualKeys.get().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0)); + } + } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesInProfileTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesInProfileTest.java new file mode 100644 index 0000000000..c16ad7af17 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesInProfileTest.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.sparkplug.attributes; + +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.dao.service.DaoSqlTest; + +import java.util.HashSet; + +/** + * Created by nickAS21 on 12.01.23 + */ +@DaoSqlTest +public class MqttV5ClientSparkplugBAttributesInProfileTest extends AbstractMqttV5ClientSparkplugAttributesTest { + + @Before + public void beforeTest() throws Exception { + sparkPlugAttributesMetricNames = new HashSet<>(); + sparkPlugAttributesMetricNames.add(metricBirthName_Int32); + beforeSparkplugTest(); + } + + @After + public void afterTest () throws MqttException { + if (client.isConnected()) { + client.disconnect(); } + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes(); + } + + @Test + public void testClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes(); + } + +} \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java index f44bbc387a..a3ebc13b5b 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java @@ -68,4 +68,14 @@ public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSp processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk(); } + @Test + public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception { + processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute(); + } + + @Test + public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { + processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk(); + } + } \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index f8a4373036..fca636d969 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -22,31 +22,33 @@ import org.junit.Assert; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; -import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; /** * Created by nickAS21 on 12.01.23 */ @Slf4j -public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest { +public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends AbstractMqttV5ClientSparkplugTest { protected void processClientWithCorrectNodeAccessTokenWithNDEATH_Test() throws Exception { - long ts = calendar.getTimeInMillis()-PUBLISH_TS_DELTA_MS; + long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long value = bdSeq = 0; clientWithCorrectNodeAccessTokenWithNDEATH(ts, value); @@ -69,38 +71,97 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstr String expectedMessage = "Server unavailable."; int expectedReasonCode = 136; Assert.assertEquals(expectedMessage, actualException.getMessage()); - Assert.assertEquals( expectedReasonCode, actualException.getReasonCode()); + Assert.assertEquals(expectedReasonCode, actualException.getReasonCode()); } protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception { - clientWithCorrectNodeAccessTokenWithNDEATH(); long ts = calendar.getTimeInMillis(); - MetricDataType metricDataType = Int32; - Set deviceIds = new HashSet<>(); - String key = "Device Metric int32"; - int valueDeviceInt32 = 1024; - SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); - for (int i=0; i < cntDevices; i++ ) { - SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() - .setTimestamp(calendar.getTimeInMillis()) - .setSeq(getSeqNum()); - String deviceName = deviceId + "_" + i; + connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); + } - payloadBirthDevice.addMetrics(metric); - if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, - payloadBirthDevice.build().toByteArray(), 0, false); - AtomicReference device = new AtomicReference<>(); - await(alias + "find device [" + deviceName + "] after created") - .atMost(200, TimeUnit.SECONDS) + protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(int cntDevices) throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); + + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), ONLINE.name())); + AtomicReference>> finalFuture = new AtomicReference<>(); + await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); + return finalFuture.get().get().contains(tsKvEntry); + }); + + for (Device device : devices) { + await(alias + messageName(STATE) + ", device: " + device.getName()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); + return finalFuture.get().get().contains(tsKvEntry); + }); + } + } + + protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(int cntDevices, int indexDeviceDisconnect) throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); + + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name())); + AtomicReference>> finalFuture = new AtomicReference<>(); + + SparkplugBProto.Payload.Builder payloadDeathDevice = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getSeqNum()); + if (client.isConnected()) { + List devicesList = new ArrayList<>(devices); + Device device = devicesList.get(indexDeviceDisconnect); + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DDEATH.name() + "/" + edgeNode + "/" + device.getName(), + payloadDeathDevice.build().toByteArray(), 0, false); + await(alias + messageName(STATE) + ", device: " + device.getName()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); + return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry); + }); + } + } + + protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(int cntDevices) throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); + + TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(messageName(STATE), OFFLINE.name())); + AtomicReference>> finalFuture = new AtomicReference<>(); + + if (client.isConnected()) { + client.disconnect(); + + await(alias + messageName(STATE) + ", device: " + savedGateway.getName()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + finalFuture.set(tsService.findAllLatest(tenantId, savedGateway.getId())); + return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry); + }); + + List devicesList = new ArrayList<>(devices); + for (Device device : devicesList) { + await(alias + messageName(STATE) + ", device: " + device.getName()) + .atMost(40, TimeUnit.SECONDS) .until(() -> { - device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); - return device.get() != null; + finalFuture.set(tsService.findAllLatest(tenantId, device.getId())); + return findEqualsKeyValueInKvEntrys(finalFuture.get().get(), tsKvEntry); }); } - deviceIds.add(deviceName); } - Assert.assertEquals(cntDevices, deviceIds.size()); + } + + private boolean findEqualsKeyValueInKvEntrys(List finalFuture, TsKvEntry tsKvEntry) { + for (TsKvEntry kvEntry : finalFuture) { + if (kvEntry.getKey().equals(tsKvEntry.getKey()) && kvEntry.getValue().equals(tsKvEntry.getValue())) { + return true; + } + } + return false; } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java index 6c928358ef..b07c3f3d8d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/MqttV5ClientSparkplugBConnectionTest.java @@ -60,4 +60,19 @@ public class MqttV5ClientSparkplugBConnectionTest extends AbstractMqttV5ClientSp processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(2); } + @Test + public void testClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL() throws Exception { + processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(3); + } + + @Test + public void testConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE() throws Exception { + processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(3, 1); + } + + @Test + public void testConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All() throws Exception { + processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(3); + } + } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java new file mode 100644 index 0000000000..f588af15ea --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java @@ -0,0 +1,107 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.sparkplug.rpc; + +import io.netty.handler.codec.mqtt.MqttQoS; +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.Test; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; + +@Slf4j +public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest { + + private static final int metricBirthValue_Int32 = 123456; + private static final String sparkplugRpcRequest = "{\"metricName\":\"" + metricBirthName_Int32 + "\",\"value\":" + metricBirthValue_Int32 + "}"; + + @Test + public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + Assert.assertTrue("Connection node is failed", client.isConnected()); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}"; + String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway); + await(alias + SparkplugMessageType.NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(expected, actual); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + @Test + public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}"; + String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0)); + await(alias + NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(expected, actual); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + @Test + public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + Assert.assertTrue("Connection node is failed", client.isConnected()); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + String invalidateTypeMessageName = "RCMD"; + String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " + + invalidateTypeMessageName + "{\\\"metricName\\\":\\\"" + metricBirthName_Int32 + "\\\",\\\"value\\\":" + metricBirthValue_Int32 + "}\"}"; + String actual = sendRPCSparkplug(invalidateTypeMessageName, sparkplugRpcRequest, savedGateway); + Assert.assertEquals(expected, actual); + } + + @Test + public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + Assert.assertTrue("Connection node is failed", client.isConnected()); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + String metricNameBad = metricBirthName_Int32 + "_Bad"; + String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}"; + String expected = "{\"result\":\"BAD_REQUEST_PARAMS\",\"error\":\"Failed send To Node Rpc Request: " + + DCMD.name() + ". This node does not have a metricName: [" + metricNameBad + "]\"}"; + String actual = sendRPCSparkplug(DCMD.name(), sparkplugRpcRequestBad, savedGateway); + Assert.assertEquals(expected, actual); + } + + private String sendRPCSparkplug(String nameTypeMessage, String keyValue, Device device) throws Exception { + String setRpcRequest = "{\"method\": \"" + nameTypeMessage + "\", \"params\": " + keyValue + "}"; + return doPostAsync("/api/plugins/rpc/twoway/" + device.getId().getId(), setRpcRequest, String.class, status().isOk()); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java new file mode 100644 index 0000000000..c1bd340352 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.sparkplug.rpc; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.dao.service.DaoSqlTest; + +@DaoSqlTest +@Slf4j +public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest { + + @Before + public void beforeTest() throws Exception { + beforeSparkplugTest(); + } + + @After + public void afterTest() throws MqttException { + if (client.isConnected()) { + client.disconnect(); + } + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success(); + } + + @Test + public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success(); + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS(); + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS(); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index 25fedb43fb..e1939b70ec 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; /** * Created by nickAS21 on 12.01.23 @@ -39,8 +38,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); - connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32()); + List listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); Assert.assertTrue("Connection node is failed", client.isConnected()); AtomicReference>> finalFuture = new AtomicReference<>(); await(alias + SparkplugMessageType.NBIRTH.name()) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 5a8927e184..0e4fee8f42 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -43,6 +43,8 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; +import org.eclipse.leshan.core.ResponseCode; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -82,6 +84,8 @@ import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler import org.thingsboard.server.transport.mqtt.util.ReturnCode; import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcResponseBody; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.net.ssl.SSLPeerUnverifiedException; @@ -110,8 +114,11 @@ import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NDEATH; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.parseTopicPublish; /** @@ -394,12 +401,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement // TODO break; case NBIRTH: - sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList()); - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); - break; case NCMD: case NDATA: - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); break; case NRECORD: // TODO @@ -416,7 +420,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case DBIRTH: case DCMD: case DDATA: - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); break; /** * TODO @@ -794,12 +798,21 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement registerSubQoS(topic, grantedQoSList, reqQoS); } - public void processAttributesSubscribe(List grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) { + private void processAttributesSubscribe(List grantedQoSList, String topic, MqttQoS reqQoS, TopicType topicType) { transportService.process(deviceSessionCtx.getSessionInfo(), TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); attrSubTopicType = topicType; registerSubQoS(topic, grantedQoSList, reqQoS); } + public void processAttributesRpcSubscribeSparkplugNode(List grantedQoSList, MqttQoS reqQoS) { + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() + .setSessionInfo(deviceSessionCtx.getSessionInfo()) + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) + .build(), null); + registerSubQoS(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, grantedQoSList, reqQoS); + } + public void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { grantedQoSList.add(getMinSupportedQos(reqQoS)); mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS)); @@ -1088,7 +1101,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (sparkplugTopicNode != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); - sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, + sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode); } else { log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName()); @@ -1129,8 +1142,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionHandler.onDevicesDisconnect(); } if (sparkplugSessionHandler != null) { - // add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp() - sparkplugSessionHandler.stateSparkplugtSendOnTelemetry(deviceSessionCtx.getSessionInfo(), + // add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp() + sparkplugSessionHandler.sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(), deviceSessionCtx.getDeviceInfo().getDeviceName(), OFFLINE, new Date().getTime()); sparkplugSessionHandler.onDevicesDisconnect(); } @@ -1139,7 +1152,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement deviceSessionCtx.release(); } - private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { if (!msg.hasDeviceInfo()) { context.onAuthFailure(address); @@ -1206,8 +1218,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { log.trace("[{}] Received attributes update notification to device", sessionId); - String topic = attrSubTopicType.getAttributesSubTopic(); - MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType); try { if (sparkplugSessionHandler != null) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); @@ -1222,6 +1232,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } }); } else { + String topic = attrSubTopicType.getAttributesSubTopic(); + MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType); adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } } catch (Exception e) { @@ -1239,39 +1251,77 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { log.trace("[{}] Received RPC command to device", sessionId); - String baseTopic = rpcSubTopicType.getRpcRequestTopicBase(); - MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType); try { - adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> { - int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); - if (isAckExpected(payload)) { - rpcAwaitingAck.put(msgId, rpcRequest); - context.getScheduler().schedule(() -> { - TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId); - if (msg != null) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); - } - }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); + if (sparkplugSessionHandler != null) { + /** + * NCMD {"metricName":"MyNodeMetric05_String","value":"MyNodeMetric05_String_Value"} + * NCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444} + * NCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333} + * NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} + * NCMD {"metricName":"Node Control/Rebirth","value":false} + * NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} + */ + SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); + if (messageType == null) { + this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), + ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); + return; } - var cf = publish(payload, deviceSessionCtx); - cf.addListener(result -> { - if (result.cause() == null) { - if (!isAckExpected(payload)) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); - } else if (rpcRequest.getPersisted()) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); - } - } else { - // TODO: send error - } - }); - }); + SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); + header.setMessageType(messageType.name()); + TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); + if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), + messageType); + sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, + sparkplugTopic.toString(), + sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) + .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo())); + } else { + sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, "Failed send To Node Rpc Request: " + + rpcRequest.getMethodName() + ". This node does not have a metricName: [" + tsKvProto.getKv().getKey() + "]"); + } + } else { + String baseTopic = rpcSubTopicType.getRpcRequestTopicBase(); + MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType); + adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic) + .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo())); + } } catch (Exception e) { - transportService.process(deviceSessionCtx.getSessionInfo(), - TransportProtos.ToDeviceRpcResponseMsg.newBuilder() - .setRequestId(rpcRequest.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY); log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); + this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), + ThingsboardErrorCode.INVALID_ARGUMENTS, + "Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams()); + } + } + + public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { + int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); + if (isAckExpected(payload)) { + rpcAwaitingAck.put(msgId, rpcRequest); + context.getScheduler().schedule(() -> { + TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId); + if (msg != null) { + transportService.process(sessionInfo, rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); + } + }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); } + var cf = publish(payload, deviceSessionCtx); + cf.addListener(result -> { + if (result.cause() == null) { + if (!isAckExpected(payload)) { + transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + } else if (rpcRequest.getPersisted()) { + transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); + } + this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); + } else { + log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName()); + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName()); + } + }); } @Override @@ -1311,4 +1361,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.close(); } + public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) { + String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.name()).error(errorMsg).build()); + TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build(); + transportService.process(sessionInfo, msg, null); + } + + public void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) { + String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.getName()).result(successMsg).build()); + TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build(); + transportService.process(sessionInfo, msg, null); + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index ba8036bb9d..c8a0c940b2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -54,7 +54,7 @@ import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.util.ReturnCode; -import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import javax.annotation.Nullable; import java.util.ArrayList; @@ -75,8 +75,9 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.STATE; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.OFFLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.messageName; /** * Created by ashvayka on 19.01.17. @@ -246,45 +247,45 @@ public abstract class AbstractGatewaySessionHandler { if (future != null) { return future; } - try { - transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() - .setDeviceName(deviceName) - .setDeviceType(deviceType) - .setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits()) - .setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()) - .setSparkplug(this.deviceSessionCtx.isSparkplug()) - .build(), - new TransportServiceCallback<>() { - @Override - public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { - AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg); - if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { - log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); - SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); - transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); - transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() - .setSessionInfo(deviceSessionInfo) - .setSessionEvent(SESSION_EVENT_MSG_OPEN) - .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) - .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) - .build(), null); - } - futureToSet.set(devices.get(deviceName)); - deviceFutures.remove(deviceName); + try { + transportService.process(GetOrCreateDeviceFromGatewayRequestMsg.newBuilder() + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .setGatewayIdMSB(gateway.getDeviceId().getId().getMostSignificantBits()) + .setGatewayIdLSB(gateway.getDeviceId().getId().getLeastSignificantBits()) + .setSparkplug(this.deviceSessionCtx.isSparkplug()) + .build(), + new TransportServiceCallback<>() { + @Override + public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) { + AbstractGatewayDeviceSessionContext deviceSessionCtx = newDeviceSessionCtx(msg); + if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { + log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); + SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); + transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); + transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() + .setSessionInfo(deviceSessionInfo) + .setSessionEvent(SESSION_EVENT_MSG_OPEN) + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) + .build(), null); } + futureToSet.set(devices.get(deviceName)); + deviceFutures.remove(deviceName); + } - @Override - public void onError(Throwable e) { - log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); - futureToSet.setException(e); - deviceFutures.remove(deviceName); - } - }); - return futureToSet; - } catch (Throwable e) { - deviceFutures.remove(deviceName); - throw e; - } + @Override + public void onError(Throwable e) { + log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e); + futureToSet.setException(e); + deviceFutures.remove(deviceName); + } + }); + return futureToSet; + } catch (Throwable e) { + deviceFutures.remove(deviceName); + throw e; + } } protected abstract AbstractGatewayDeviceSessionContext newDeviceSessionCtx(GetOrCreateDeviceFromGatewayResponse msg); @@ -557,7 +558,7 @@ public abstract class AbstractGatewaySessionHandler { } } - private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { + protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); } @@ -727,23 +728,23 @@ public abstract class AbstractGatewaySessionHandler { } private void deregisterSession(String deviceName, MqttDeviceAwareSessionContext deviceSessionCtx) { - if (this.deviceSessionCtx.isSparkplug()){ + if (this.deviceSessionCtx.isSparkplug()) { // add Msg Telemetry: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp() - stateSparkplugtSendOnTelemetry (deviceSessionCtx.getSessionInfo(), + sendSparkplugStateOnTelemetry(deviceSessionCtx.getSessionInfo(), deviceSessionCtx.getDeviceInfo().getDeviceName(), OFFLINE, new Date().getTime()); } transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); - System.out.println("Removed device " + deviceName + " from the gateway session"); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); } - public void stateSparkplugtSendOnTelemetry (TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugMessageTypeSate typeSate, long ts) { + public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState connectionState, long ts) { TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder(); - keyValueProtoBuilder.setKey(STATE.name()); + keyValueProtoBuilder.setKey(messageName(STATE)); keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V); - keyValueProtoBuilder.setStringV(typeSate.name()); - TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts); + keyValueProtoBuilder.setStringV(connectionState.name()); + TransportProtos.PostTelemetryMsg postTelemetryMsg = postTelemetryMsgCreated(keyValueProtoBuilder.build(), ts); + transportService.process(sessionInfo, postTelemetryMsg, getPubAckCallback(channel, deviceName, -1, postTelemetryMsg)); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index 06a398b9c1..7e8e7421e4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -16,16 +16,23 @@ package org.thingsboard.server.transport.mqtt.session; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; +import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; +import java.util.Date; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto; + @Slf4j public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext { @@ -53,4 +60,44 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC }); } + @Override + public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { + log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId); + try { + /** + * DCMD {"metricName":"MyDeviceMetricText","value":"MyNodeMetric05_String_Value"} + * DCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444} + * DCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333} + * DCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} + * DCMD {"metricName":"Node Control/Rebirth","value":false} + * DCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} + */ + SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); + if (messageType == null) { + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); + return; + } + SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); + header.setMessageType(messageType.name()); + TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); + if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(), + messageType, deviceInfo.getDeviceName()); + parent.createSparkplugMqttPublishMsg(tsKvProto, + sparkplugTopic.toString(), + getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) + .ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo)); + } else { + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " + + rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]"); + } + } catch (ThingsboardException e) { + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " + + rpcRequest.getMethodName() + ". " + e.getMessage()); + } + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index c52c5e33a8..041a32f461 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -21,21 +21,24 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.protobuf.Descriptors; +import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.device.profile.MqttTopics; +import org.eclipse.leshan.core.ResponseCode; +import org.springframework.util.CollectionUtils; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; +import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; -import org.thingsboard.server.transport.mqtt.TopicType; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; @@ -44,6 +47,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -51,7 +55,7 @@ import java.util.stream.Collectors; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DBIRTH; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NBIRTH; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageTypeSate.ONLINE; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.fromSparkplugBMetricToKeyValueProto; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.validatedValueByTypeMetric; @@ -96,37 +100,47 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void onTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { + public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { checkDeviceName(deviceName); ListenableFuture contextListenableFuture = topic.isNode() ? Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName); - List msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); - if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { - try { + try { + if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { // add Msg Telemetry: key STATE type: String value: ONLINE ts: sparkplugBProto.getTimestamp() - stateSparkplugtSendOnTelemetry(contextListenableFuture.get().getSessionInfo(), deviceName, ONLINE, + sendSparkplugStateOnTelemetry(contextListenableFuture.get().getSessionInfo(), deviceName, ONLINE, sparkplugBProto.getTimestamp()); - contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList()); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed add Metrics. MessageType *BIRTH.", e); } + if (topic.isType(NBIRTH)) { + setNodeBirthMetrics(sparkplugBProto.getMetricsList()); + } else if (topic.isType(DBIRTH)) { + contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList()); + } + } catch (InterruptedException | ExecutionException e) { + log.error("Failed add Metrics or change SparkplugConnectionState. MessageType *BIRTH.", e); } - onDeviceTelemetryProto(contextListenableFuture, msgId, msgs, deviceName); + Set attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx + .getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames(); + if (attributesMetricNames != null) { + List attributesMsgList = convertToPostAttributes(sparkplugBProto, attributesMetricNames, deviceName); + onDeviceAttributesProto(contextListenableFuture, msgId, attributesMsgList, deviceName); + } + List postTelemetryMsgList = convertToPostTelemetry(sparkplugBProto, attributesMetricNames, topic.getType().name()); + onDeviceTelemetryProto(contextListenableFuture, msgId, postTelemetryMsgList, deviceName); } public void onDeviceTelemetryProto(ListenableFuture contextListenableFuture, - int msgId, List msgs, String deviceName) throws AdaptorException { + int msgId, List postTelemetryMsgList, String deviceName) throws AdaptorException { try { int finalMsgId = msgId; - for (TransportProtos.PostTelemetryMsg msg : msgs) { + postTelemetryMsgList.forEach(telemetryMsg -> { Futures.addCallback(contextListenableFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { try { - processPostTelemetryMsg(deviceCtx, msg, deviceName, finalMsgId); + processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, finalMsgId); } catch (Throwable e) { - log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, msg, e); + log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); channel.close(); } } @@ -136,6 +150,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); } }, context.getExecutor()); + }); + } catch (RuntimeException e) { + throw new AdaptorException(e); + } + } + + private void onDeviceAttributesProto(ListenableFuture contextListenableFuture, int msgId, + List attributesMsgList, String deviceName) throws AdaptorException { + try { + if (!CollectionUtils.isEmpty(attributesMsgList)) { + attributesMsgList.forEach(attributesMsg -> { + Futures.addCallback(contextListenableFuture, + new FutureCallback<>() { + @Override + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { + TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); + try { + TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray()); + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); + } catch (Throwable e) { + log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); + } + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t); + } + }, context.getExecutor()); + }); + } else { + log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); } } catch (RuntimeException e) { throw new AdaptorException(e); @@ -152,7 +198,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { // TODO SUBSCRIBE GroupId } else if (sparkplugTopic.isNode()) { // SUBSCRIBE Node - parent.processAttributesSubscribe(grantedQoSList, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, reqQoS, TopicType.V1); + parent.processAttributesRpcSubscribeSparkplugNode(grantedQoSList, reqQoS); } else { // SUBSCRIBE Device - DO NOTHING, WE HAVE ALREADY SUBSCRIBED. // TODO: track that node subscribed to # or to particular device. @@ -177,16 +223,18 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - private List convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, String topicTypeName) throws AdaptorException { + private List convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, Set attributesMetricNames, String topicTypeName) throws AdaptorException { try { List msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - long ts = protoMetric.getTimestamp(); - String key = "bdSeq".equals(protoMetric.getName()) ? - topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); - Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); - if (keyValueProtoOpt.isPresent()) { - msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts)); + if (attributesMetricNames == null || !attributesMetricNames.contains(protoMetric.getName())) { + long ts = protoMetric.getTimestamp(); + String key = "bdSeq".equals(protoMetric.getName()) ? + topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); + if (keyValueProtoOpt.isPresent()) { + msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts)); + } } } @@ -204,6 +252,39 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } + private List convertToPostAttributes(SparkplugBProto.Payload sparkplugBProto, + Set attributesMetricNames, + String deviceName) throws AdaptorException { + try { + List msgs = new ArrayList<>(); + for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { + if (attributesMetricNames.contains(protoMetric.getName())) { + TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); + Optional msgOpt = getPostAttributeMsg(protoMetric); + if (msgOpt.isPresent()) { + deviceAttributesMsgBuilder.setDeviceName(deviceName); + deviceAttributesMsgBuilder.setMsg(msgOpt.get()); + msgs.add(deviceAttributesMsgBuilder.build()); + } + } + } + return msgs; + } catch (IllegalStateException | JsonSyntaxException | ThingsboardException e) { + log.error("Failed to decode post telemetry request", e); + throw new AdaptorException(e); + } + } + + private Optional getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric); + if (keyValueProtoOpt.isPresent()) { + TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder(); + builder.addKv(keyValueProtoOpt.get()); + return Optional.of(builder.build()); + } + return Optional.empty(); + } + public SparkplugTopic getSparkplugTopicNode() { return this.sparkplugTopicNode; } @@ -238,4 +319,17 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); } + protected void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { + parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo); + } + + protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) { + parent.sendErrorRpcResponse(sessionInfo, requestId, result, errorMsg); + } + + protected void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) { + parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg); + } + + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java similarity index 96% rename from common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java rename to common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java index f9a18adbd3..daf2312e36 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageTypeSate.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugConnectionState.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.transport.mqtt.util.sparkplug; -public enum SparkplugMessageTypeSate { +public enum SparkplugConnectionState { /** * The EoN node should examine the payload of this * message to ensure that it is a value of “ONLINE” diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java index 8370d2273e..7bbcd3548f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMessageType.java @@ -87,6 +87,9 @@ public enum SparkplugMessageType { } throw new ThingsboardException("Invalid message type: " + type, ThingsboardErrorCode.INVALID_ARGUMENTS); } + public static String messageName(SparkplugMessageType type) { + return STATE.equals(type) ? "sparkplugConnectionState" : type.name(); + } public boolean isDeath() { return this.equals(DDEATH) || this.equals(NDEATH); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index 19ec3156e6..6137433029 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -189,6 +189,41 @@ public class SparkplugMetricUtil { return metric; } + public static TransportProtos.TsKvProto getTsKvProto(String key, Object value, long ts) throws ThingsboardException { + try { + TransportProtos.TsKvProto.Builder tsKvProtoBuilder = TransportProtos.TsKvProto.newBuilder(); + TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder(); + keyValueProtoBuilder.setKey(key); + if (value instanceof String) { + keyValueProtoBuilder.setType(TransportProtos.KeyValueType.STRING_V); + keyValueProtoBuilder.setStringV((String) value); + } else if (value instanceof Integer) { + keyValueProtoBuilder.setType(TransportProtos.KeyValueType.LONG_V); + keyValueProtoBuilder.setLongV((Integer) value); + } else if (value instanceof Long) { + keyValueProtoBuilder.setType(TransportProtos.KeyValueType.LONG_V); + keyValueProtoBuilder.setLongV((Long) value); + } else if (value instanceof Boolean) { + keyValueProtoBuilder.setType(TransportProtos.KeyValueType.BOOLEAN_V); + keyValueProtoBuilder.setBoolV((Boolean) value); + } else if (value instanceof Double) { + keyValueProtoBuilder.setType(TransportProtos.KeyValueType.DOUBLE_V); + keyValueProtoBuilder.setDoubleV((Double) value); + } else if (value instanceof List) { + keyValueProtoBuilder.setType(TransportProtos.KeyValueType.JSON_V); + ArrayNode arrayNodeBytes = JacksonUtil.convertValue(value, ArrayNode.class); + keyValueProtoBuilder.setJsonV(arrayNodeBytes.toString()); + } else { + throw new ThingsboardException("Failed to convert device/node RPC command to TsKvProto for Sparkplug MQT msg: value [" + value + "]", ThingsboardErrorCode.INVALID_ARGUMENTS); + } + tsKvProtoBuilder.setKv(keyValueProtoBuilder.build()); + tsKvProtoBuilder.setTs(ts); + return tsKvProtoBuilder.build(); + } catch (Exception e) { + throw new ThingsboardException("Failed to convert device/node RPC command to TsKvProto for Sparkplug MQT msg: value [" + value + "]", ThingsboardErrorCode.INVALID_ARGUMENTS); + } + } + public static Optional validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) throws ThingsboardException { if (kv.getTypeValue() <= 3) { return validatedValuePrimitiveByTypeMetric(kv, metricDataType); @@ -214,8 +249,8 @@ public class SparkplugMetricUtil { case UInt8: case UInt16: case Int32: - Optional boolInt8 = booleanStringToInt (valueOpt.get()); - if(boolInt8.isPresent()) { + Optional boolInt8 = booleanStringToInt(valueOpt.get()); + if (boolInt8.isPresent()) { return Optional.of(boolInt8.get()); } try { @@ -233,25 +268,25 @@ public class SparkplugMetricUtil { case Int64: case UInt64: case DateTime: - Optional boolInt64 = booleanStringToInt (valueOpt.get()); - if(boolInt64.isPresent()) { + Optional boolInt64 = booleanStringToInt(valueOpt.get()); + if (boolInt64.isPresent()) { return Optional.of(Long.valueOf(boolInt64.get())); } var l = new BigDecimal(valueOpt.get()); return Optional.of(l.longValue()); - // float + // float case Float: - Optional boolFloat = booleanStringToInt (valueOpt.get()); - if(boolFloat.isPresent()) { + Optional boolFloat = booleanStringToInt(valueOpt.get()); + if (boolFloat.isPresent()) { var fb = new BigDecimal(boolFloat.get()); return Optional.of(fb.floatValue()); } var f = new BigDecimal(valueOpt.get()); return Optional.of(f.floatValue()); - // double + // double case Double: - Optional boolDouble = booleanStringToInt (valueOpt.get()); - if(boolDouble.isPresent()) { + Optional boolDouble = booleanStringToInt(valueOpt.get()); + if (boolDouble.isPresent()) { return Optional.of(Double.valueOf(boolDouble.get())); } var dd = new BigDecimal(valueOpt.get()); @@ -272,7 +307,7 @@ public class SparkplugMetricUtil { case String: case Text: case UUID: - return Optional.of(valueOpt.get()); + return Optional.of(valueOpt.get()); } } catch (Exception e) { log.trace("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); @@ -282,15 +317,16 @@ public class SparkplugMetricUtil { return Optional.empty(); } - public static Optional validatedValueJsonByTypeMetric(String arrayNodeStr, MetricDataType metricDataType) { + public static Optional validatedValueJsonByTypeMetric(String arrayNodeStr, MetricDataType metricDataType) { try { Optional valueOpt; switch (metricDataType) { // byte[] case Bytes: - List listBytes = JacksonUtil.fromString(arrayNodeStr, new TypeReference<>() {}); + List listBytes = JacksonUtil.fromString(arrayNodeStr, new TypeReference<>() { + }); byte[] bytes = new byte[listBytes.size()]; - for(int i = 0; i < listBytes.size(); i++) { + for (int i = 0; i < listBytes.size(); i++) { bytes[i] = listBytes.get(i).byteValue(); } return Optional.of(bytes); @@ -324,7 +360,7 @@ public class SparkplugMetricUtil { } } - private static Optional booleanStringToInt (String booleanStr) { + private static Optional booleanStringToInt(String booleanStr) { if ("true".equals(booleanStr)) { return Optional.of(1); } else if ("false".equals(booleanStr)) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugRpcRequestHeader.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugRpcRequestHeader.java new file mode 100644 index 0000000000..47b7cb6dd2 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugRpcRequestHeader.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkplugRpcRequestHeader { + + private String messageType; + private String metricName; + private Object value; + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugRpcResponseBody.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugRpcResponseBody.java new file mode 100644 index 0000000000..d120190588 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugRpcResponseBody.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.util.sparkplug; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SparkplugRpcResponseBody { + + private String result; + private String value; + private String error; + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 1ec98aac9f..f5fa34e218 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -46,7 +46,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext { protected volatile DeviceProfile deviceProfile; @Getter @Setter - private volatile TransportProtos.SessionInfoProto sessionInfo; + protected volatile TransportProtos.SessionInfoProto sessionInfo; @Setter private volatile boolean connected;