diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java index 28e458d971..05ebedd6e8 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/AbstractMqttIntegrationTest.java @@ -104,6 +104,7 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg mqttDeviceProfileTransportConfiguration.setDeviceAttributesTopic(config.getAttributesTopicFilter()); } mqttDeviceProfileTransportConfiguration.setSparkPlug(config.isSparkPlug()); + mqttDeviceProfileTransportConfiguration.setSparkPlugAttributesMetricNames(config.sparkPlugAttributesMetricNames); mqttDeviceProfileTransportConfiguration.setSendAckOnValidationException(config.isSendAckOnValidationException()); TransportPayloadTypeConfiguration transportPayloadTypeConfiguration; if (TransportPayloadType.JSON.equals(transportPayloadType)) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java index 1a2b6eefa4..599b18ff17 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/MqttTestConfigProperties.java @@ -20,6 +20,8 @@ import lombok.Data; import org.thingsboard.server.common.data.DeviceProfileProvisionType; import org.thingsboard.server.common.data.TransportPayloadType; +import java.util.Set; + @Data @Builder public class MqttTestConfigProperties { @@ -27,6 +29,7 @@ public class MqttTestConfigProperties { String deviceName; String gatewayName; boolean isSparkPlug; + Set sparkPlugAttributesMetricNames; TransportPayloadType transportPayloadType; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index e949888733..00bf9c7c3a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -28,6 +28,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; import org.junit.Assert; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -48,8 +49,12 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static org.awaitility.Awaitility.await; import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK; import static org.thingsboard.common.util.JacksonUtil.newArrayNode; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; @@ -84,10 +89,19 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte protected int seq = 0; protected static final long PUBLISH_TS_DELTA_MS = 86400000;// Publish start TS <-> 24h + // NBIRTH + protected static final String keyNodeRebirth = "Node Control/Rebirth"; + + //*BIRTH + protected static final MetricDataType metricBirthDataType_Int32 = Int32; + protected static final String metricBirthName_Int32 = "Device Metric int32"; + protected Set sparkPlugAttributesMetricNames; + public void beforeSparkplugTest() throws Exception { MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .gatewayName("Test Connect Sparkplug client node") .isSparkPlug(true) + .sparkPlugAttributesMetricNames(sparkPlugAttributesMetricNames) .transportPayloadType(TransportPayloadType.PROTOBUF) .build(); processBeforeTest(configProperties); @@ -123,6 +137,50 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte MqttConnAck connAckMsg = (MqttConnAck) response; Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode()); } + protected List connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices, long ts) throws Exception { + List devices = new ArrayList<>(); + clientWithCorrectNodeAccessTokenWithNDEATH(); + MetricDataType metricDataType = Int32; + String key = "Node Metric int32"; + int valueDeviceInt32 = 1024; + SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); + SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getBdSeqNum()); + payloadBirthNode.addMetrics(metric); + payloadBirthNode.setTimestamp(ts); + if (client.isConnected()) { + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, + payloadBirthNode.build().toByteArray(), 0, false); + } + + valueDeviceInt32 = 4024; + metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32); + for (int i = 0; i < cntDevices; i++) { + SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts) + .setSeq(getSeqNum()); + String deviceName = deviceId + "_" + i; + + payloadBirthDevice.addMetrics(metric); + if (client.isConnected()) { + client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, + payloadBirthDevice.build().toByteArray(), 0, false); + AtomicReference device = new AtomicReference<>(); + await(alias + "find device [" + deviceName + "] after created") + .atMost(200, TimeUnit.SECONDS) + .until(() -> { + device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); + return device.get() != null; + }); + devices.add(device.get()); + } + + } + + Assert.assertEquals(cntDevices, devices.size()); + return devices; + } protected long getBdSeqNum() throws Exception { if (bdSeq == 256) { @@ -138,17 +196,16 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte return seq++; } - - protected void connectionWithBirth(List listKeys, MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception { + protected List connectionWithNBirth(MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception { + List listKeys = new ArrayList<>(); SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() .setTimestamp(calendar.getTimeInMillis()); long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS; long valueBdSec = getBdSeqNum(); payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64)); listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq); - String keyRebirth = "Node Control/Rebirth"; - payloadBirthNode.addMetrics(createMetric(false, ts, keyRebirth, MetricDataType.Boolean)); - listKeys.add(keyRebirth); + payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean)); + listKeys.add(keyNodeRebirth); payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType)); listKeys.add(metricKey); @@ -157,6 +214,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, payloadBirthNode.build().toByteArray(), 0, false); } + return listKeys; } protected void createdAddMetricValuePrimitiveTsKv(List listTsKvEntry, List listKeys, @@ -369,11 +427,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte @Override public void messageArrived(String topic, MqttMessage mqttMsg) throws Exception { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(mqttMsg.getPayload()); - System.out.println("Message Arrived on topic " + topic); - for (SparkplugBProto.Payload.Metric metric : sparkplugBProtoNode.getMetricsList()) { - System.out.println("Metric: " + metric.toString()); - messageArrivedMetrics.add(metric); - } + messageArrivedMetrics.addAll(sparkplugBProtoNode.getMetricsList()); } @Override diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 15427099ce..8cfd4f4547 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -15,22 +15,24 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug.attributes; +import com.fasterxml.jackson.core.type.TypeReference; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; +import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; /** * Created by nickAS21 on 12.01.23 @@ -38,8 +40,6 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp @Slf4j public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends AbstractMqttV5ClientSparkplugTest { - protected ThreadLocalRandom random = ThreadLocalRandom.current(); - /** * "sparkPlugAttributesMetricNames": ["SN node", "SN device", "Firmware version", "Date version", "Last date update"] * @throws Exception @@ -47,22 +47,20 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); - connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32()); + List listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); // Shared attribute "Node Control/Rebirth" = true. type = NCMD. - String key = "Node Control/Rebirth"; boolean value = true; - Assert.assertTrue(listKeys.contains(key)); - String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + key + "\":" + value + "}"; + Assert.assertTrue(listKeys.contains(keyNodeRebirth)); + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}"; Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); await(alias + SparkplugMessageType.NBIRTH.name()) .atMost(40, TimeUnit.SECONDS) .until(() -> { return mqttCallback.getMessageArrivedMetrics().size() == 1; }); - Assert.assertEquals(key, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(keyNodeRebirth, mqttCallback.getMessageArrivedMetrics().get(0).getName()); Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue()); } @@ -75,13 +73,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra */ protected void processClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.Boolean; String metricKey = "MyBoolean"; Object metricValue = nextBoolean(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Boolean <-> String boolean expectedValue = true; @@ -139,13 +136,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = UInt32; String metricKey = "MyLong"; Object metricValue = nextUInt32(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Long <-> String String valueStr = "123"; @@ -191,13 +187,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.Float; String metricKey = "MyFloat"; Object metricValue = nextFloat(30, 400); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Float <-> String String valueStr = "123.345"; @@ -243,13 +238,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.Double; String metricKey = "MyDouble"; Object metricValue = nextDouble(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // Double <-> String String valueStr = "123345456"; @@ -295,13 +289,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); MetricDataType metricDataType = MetricDataType.String; String metricKey = "MyString"; Object metricValue = nextString(); - connectionWithBirth(listKeys, metricDataType, metricKey, metricValue); + connectionWithNBirth(metricDataType, metricKey, metricValue); Assert.assertTrue("Connection node is failed", client.isConnected()); - client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); // String <-> Long long valueLong = 123345456L; @@ -345,4 +338,99 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue()); } + protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + + // Integer <-> Integer + int expectedValueInt = 123456; + + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + expectedValueInt + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.DBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + + // Int <-> String + String valueStr = "123"; + long expectedValue = Long.valueOf(valueStr); + + String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueStr + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.DBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + // Int <-> Boolean + Boolean valueBoolean = true; + expectedValue = 1; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + mqttCallback.deleteMessageArrivedMetrics(0); + + valueBoolean = false; + expectedValue = 0; + SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } + + protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + AtomicReference> actualKeys = new AtomicReference<>(); + await(alias + SparkplugMessageType.NBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { + })); + return actualKeys.get().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0)); + } + + protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + AtomicReference> actualKeys = new AtomicReference<>(); + await(alias + SparkplugMessageType.DBIRTH.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { + })); + return actualKeys.get().size() == 1; + }); + Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0)); + } + } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesInProfileTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesInProfileTest.java new file mode 100644 index 0000000000..c16ad7af17 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesInProfileTest.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.sparkplug.attributes; + +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.dao.service.DaoSqlTest; + +import java.util.HashSet; + +/** + * Created by nickAS21 on 12.01.23 + */ +@DaoSqlTest +public class MqttV5ClientSparkplugBAttributesInProfileTest extends AbstractMqttV5ClientSparkplugAttributesTest { + + @Before + public void beforeTest() throws Exception { + sparkPlugAttributesMetricNames = new HashSet<>(); + sparkPlugAttributesMetricNames.add(metricBirthName_Int32); + beforeSparkplugTest(); + } + + @After + public void afterTest () throws MqttException { + if (client.isConnected()) { + client.disconnect(); } + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes(); + } + + @Test + public void testClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { + processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes(); + } + +} \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java index f44bbc387a..a3ebc13b5b 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/MqttV5ClientSparkplugBAttributesTest.java @@ -68,4 +68,14 @@ public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSp processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk(); } + @Test + public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception { + processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute(); + } + + @Test + public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception { + processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk(); + } + } \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java index abe562d4c4..49eed71e71 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/connection/AbstractMqttV5ClientSparkplugConnectionTest.java @@ -27,22 +27,17 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; -import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric; /** * Created by nickAS21 on 12.01.23 @@ -78,15 +73,13 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra } protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception { - Set devices = new HashSet<>(); long ts = calendar.getTimeInMillis(); - connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts); + connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); } protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(int cntDevices) throws Exception { - Set devices = new HashSet<>(); long ts = calendar.getTimeInMillis(); - connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), ONLINE.name())); AtomicReference>> finalFuture = new AtomicReference<>(); @@ -108,9 +101,8 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra } protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(int cntDevices, int indexDeviceDisconnect) throws Exception { - Set devices = new HashSet<>(); long ts = calendar.getTimeInMillis(); - connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), OFFLINE.name())); AtomicReference>> finalFuture = new AtomicReference<>(); @@ -133,9 +125,8 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra } protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(int cntDevices) throws Exception { - Set devices = new HashSet<>(); long ts = calendar.getTimeInMillis(); - connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts); TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), OFFLINE.name())); AtomicReference>> finalFuture = new AtomicReference<>(); @@ -162,50 +153,6 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra } } - private void connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(Set devices, int cntDevices, long ts) throws Exception { - clientWithCorrectNodeAccessTokenWithNDEATH(); - MetricDataType metricDataType = Int32; - String key = "Node Metric int32"; - int valueDeviceInt32 = 1024; - SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType); - SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder() - .setTimestamp(ts) - .setSeq(getBdSeqNum()); - payloadBirthNode.addMetrics(metric); - payloadBirthNode.setTimestamp(ts); - if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode, - payloadBirthNode.build().toByteArray(), 0, false); - } - metricDataType = Int32; - key = "Device Metric int32"; - valueDeviceInt32 = 4024; - metric = createMetric(valueDeviceInt32, ts, key, metricDataType); - for (int i = 0; i < cntDevices; i++) { - SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder() - .setTimestamp(ts) - .setSeq(getSeqNum()); - String deviceName = deviceId + "_" + i; - - payloadBirthDevice.addMetrics(metric); - if (client.isConnected()) { - client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName, - payloadBirthDevice.build().toByteArray(), 0, false); - AtomicReference device = new AtomicReference<>(); - await(alias + "find device [" + deviceName + "] after created") - .atMost(200, TimeUnit.SECONDS) - .until(() -> { - device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); - return device.get() != null; - }); - devices.add(device.get()); - } - - } - - Assert.assertEquals(cntDevices, devices.size()); - } - private boolean findEqualsKeyValueInKvEntrys(List finalFuture, TsKvEntry tsKvEntry) { for (TsKvEntry kvEntry : finalFuture) { if (kvEntry.getKey().equals(tsKvEntry.getKey()) && kvEntry.getValue().equals(tsKvEntry.getValue())) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java index dfdc2d9ea9..f588af15ea 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/AbstractMqttV5RpcSparkplugTest.java @@ -15,45 +15,93 @@ */ package org.thingsboard.server.transport.mqtt.sparkplug.rpc; -import com.nimbusds.jose.util.StandardCharset; +import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.mqttv5.common.MqttException; -import org.eclipse.paho.mqttv5.common.MqttMessage; -import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; -import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback; -import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; +import org.junit.Assert; +import org.junit.Test; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; @Slf4j -public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttIntegrationTest { +public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest { - private static final String DEVICE_RESPONSE = "{\"value1\":\"A\",\"value2\":\"B\"}"; - private static final String setSparklpugRpcNodeRequest = "{\"method\": \"NCMD\", \"params\": {\"MyNodeMetric05_String\":\"MyNodeMetric05_String_Value\"}}"; - private static final String setSparklpugRpcDeviceRequest = "{\"method\": \"DCMD\", \"params\": {\"MyDeviceMetric05_String\":{\"MyDeviceMetric05_String_Value\"}}"; + private static final int metricBirthValue_Int32 = 123456; + private static final String sparkplugRpcRequest = "{\"metricName\":\"" + metricBirthName_Int32 + "\",\"value\":" + metricBirthValue_Int32 + "}"; - protected class MqttV5TestRpcCallback extends MqttV5TestCallback { + @Test + public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + Assert.assertTrue("Connection node is failed", client.isConnected()); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}"; + String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway); + await(alias + SparkplugMessageType.NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(expected, actual); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } - private final MqttV5TestClient client; + @Test + public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + long ts = calendar.getTimeInMillis(); + List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); + String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}"; + String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0)); + await(alias + NCMD.name()) + .atMost(40, TimeUnit.SECONDS) + .until(() -> { + return mqttCallback.getMessageArrivedMetrics().size() == 1; + }); + Assert.assertEquals(expected, actual); + Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName()); + Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue()); + } - public MqttV5TestRpcCallback(MqttV5TestClient client, String awaitSubTopic) { - super(awaitSubTopic); - this.client = client; - } + @Test + public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + Assert.assertTrue("Connection node is failed", client.isConnected()); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + String invalidateTypeMessageName = "RCMD"; + String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " + + invalidateTypeMessageName + "{\\\"metricName\\\":\\\"" + metricBirthName_Int32 + "\\\",\\\"value\\\":" + metricBirthValue_Int32 + "}\"}"; + String actual = sendRPCSparkplug(invalidateTypeMessageName, sparkplugRpcRequest, savedGateway); + Assert.assertEquals(expected, actual); + } - @Override - protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) { - log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic); - if (awaitSubTopic.equals(requestTopic)) { - qoS = mqttMessage.getQos(); - payloadBytes = mqttMessage.getPayload(); - String responseTopic = requestTopic.replace("request", "response"); - try { - client.publish(responseTopic, DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8)); - } catch (MqttException e) { - log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e); - } - subscribeLatch.countDown(); - } - } + @Test + public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception { + clientWithCorrectNodeAccessTokenWithNDEATH(); + connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); + Assert.assertTrue("Connection node is failed", client.isConnected()); + client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE); + String metricNameBad = metricBirthName_Int32 + "_Bad"; + String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}"; + String expected = "{\"result\":\"BAD_REQUEST_PARAMS\",\"error\":\"Failed send To Node Rpc Request: " + + DCMD.name() + ". This node does not have a metricName: [" + metricNameBad + "]\"}"; + String actual = sendRPCSparkplug(DCMD.name(), sparkplugRpcRequestBad, savedGateway); + Assert.assertEquals(expected, actual); + } + + private String sendRPCSparkplug(String nameTypeMessage, String keyValue, Device device) throws Exception { + String setRpcRequest = "{\"method\": \"" + nameTypeMessage + "\", \"params\": " + keyValue + "}"; + return doPostAsync("/api/plugins/rpc/twoway/" + device.getId().getId(), setRpcRequest, String.class, status().isOk()); } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java new file mode 100644 index 0000000000..c1bd340352 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/rpc/MqttV5RpcSparkplugTest.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.sparkplug.rpc; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.dao.service.DaoSqlTest; + +@DaoSqlTest +@Slf4j +public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest { + + @Before + public void beforeTest() throws Exception { + beforeSparkplugTest(); + } + + @After + public void afterTest() throws MqttException { + if (client.isConnected()) { + client.disconnect(); + } + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success(); + } + + @Test + public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception { + processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success(); + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS(); + } + + @Test + public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception { + processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS(); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java index 25fedb43fb..e1939b70ec 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/timeseries/AbstractMqttV5ClientSparkplugTelemetryTest.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; -import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32; /** * Created by nickAS21 on 12.01.23 @@ -39,8 +38,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); - List listKeys = new ArrayList<>(); - connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32()); + List listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); Assert.assertTrue("Connection node is failed", client.isConnected()); AtomicReference>> finalFuture = new AtomicReference<>(); await(alias + SparkplugMessageType.NBIRTH.name()) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 1350ad8499..041a32f461 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2022 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.