sparkplug: tests rpc and attributes

This commit is contained in:
nickAS21 2023-02-16 18:02:50 +02:00
parent 1684b924e8
commit a80b3cb536
11 changed files with 397 additions and 132 deletions

View File

@ -104,6 +104,7 @@ public abstract class AbstractMqttIntegrationTest extends AbstractTransportInteg
mqttDeviceProfileTransportConfiguration.setDeviceAttributesTopic(config.getAttributesTopicFilter());
}
mqttDeviceProfileTransportConfiguration.setSparkPlug(config.isSparkPlug());
mqttDeviceProfileTransportConfiguration.setSparkPlugAttributesMetricNames(config.sparkPlugAttributesMetricNames);
mqttDeviceProfileTransportConfiguration.setSendAckOnValidationException(config.isSendAckOnValidationException());
TransportPayloadTypeConfiguration transportPayloadTypeConfiguration;
if (TransportPayloadType.JSON.equals(transportPayloadType)) {

View File

@ -20,6 +20,8 @@ import lombok.Data;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
import org.thingsboard.server.common.data.TransportPayloadType;
import java.util.Set;
@Data
@Builder
public class MqttTestConfigProperties {
@ -27,6 +29,7 @@ public class MqttTestConfigProperties {
String deviceName;
String gatewayName;
boolean isSparkPlug;
Set<String> sparkPlugAttributesMetricNames;
TransportPayloadType transportPayloadType;

View File

@ -28,6 +28,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@ -48,8 +49,12 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes;
@ -84,10 +89,19 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
protected int seq = 0;
protected static final long PUBLISH_TS_DELTA_MS = 86400000;// Publish start TS <-> 24h
// NBIRTH
protected static final String keyNodeRebirth = "Node Control/Rebirth";
//*BIRTH
protected static final MetricDataType metricBirthDataType_Int32 = Int32;
protected static final String metricBirthName_Int32 = "Device Metric int32";
protected Set<String> sparkPlugAttributesMetricNames;
public void beforeSparkplugTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.gatewayName("Test Connect Sparkplug client node")
.isSparkPlug(true)
.sparkPlugAttributesMetricNames(sparkPlugAttributesMetricNames)
.transportPayloadType(TransportPayloadType.PROTOBUF)
.build();
processBeforeTest(configProperties);
@ -123,6 +137,50 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
MqttConnAck connAckMsg = (MqttConnAck) response;
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode());
}
protected List<Device> connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices, long ts) throws Exception {
List<Device> devices = new ArrayList<>();
clientWithCorrectNodeAccessTokenWithNDEATH();
MetricDataType metricDataType = Int32;
String key = "Node Metric int32";
int valueDeviceInt32 = 1024;
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getBdSeqNum());
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
valueDeviceInt32 = 4024;
metric = createMetric(valueDeviceInt32, ts, metricBirthName_Int32, metricBirthDataType_Int32);
for (int i = 0; i < cntDevices; i++) {
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getSeqNum());
String deviceName = deviceId + "_" + i;
payloadBirthDevice.addMetrics(metric);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
payloadBirthDevice.build().toByteArray(), 0, false);
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;
});
devices.add(device.get());
}
}
Assert.assertEquals(cntDevices, devices.size());
return devices;
}
protected long getBdSeqNum() throws Exception {
if (bdSeq == 256) {
@ -138,17 +196,16 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
return seq++;
}
protected void connectionWithBirth(List<String> listKeys, MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception {
protected List<String> connectionWithNBirth(MetricDataType metricDataType, String metricKey, Object metricValue) throws Exception {
List<String> listKeys = new ArrayList<>();
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(calendar.getTimeInMillis());
long ts = calendar.getTimeInMillis() - PUBLISH_TS_DELTA_MS;
long valueBdSec = getBdSeqNum();
payloadBirthNode.addMetrics(createMetric(valueBdSec, ts, keysBdSeq, Int64));
listKeys.add(SparkplugMessageType.NBIRTH.name() + " " + keysBdSeq);
String keyRebirth = "Node Control/Rebirth";
payloadBirthNode.addMetrics(createMetric(false, ts, keyRebirth, MetricDataType.Boolean));
listKeys.add(keyRebirth);
payloadBirthNode.addMetrics(createMetric(false, ts, keyNodeRebirth, MetricDataType.Boolean));
listKeys.add(keyNodeRebirth);
payloadBirthNode.addMetrics(createMetric(metricValue, ts, metricKey, metricDataType));
listKeys.add(metricKey);
@ -157,6 +214,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
return listKeys;
}
protected void createdAddMetricValuePrimitiveTsKv(List<TsKvEntry> listTsKvEntry, List<String> listKeys,
@ -369,11 +427,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
@Override
public void messageArrived(String topic, MqttMessage mqttMsg) throws Exception {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(mqttMsg.getPayload());
System.out.println("Message Arrived on topic " + topic);
for (SparkplugBProto.Payload.Metric metric : sparkplugBProtoNode.getMetricsList()) {
System.out.println("Metric: " + metric.toString());
messageArrivedMetrics.add(metric);
}
messageArrivedMetrics.addAll(sparkplugBProtoNode.getMetricsList());
}
@Override

View File

@ -15,22 +15,24 @@
*/
package org.thingsboard.server.transport.mqtt.sparkplug.attributes;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest;
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
/**
* Created by nickAS21 on 12.01.23
@ -38,8 +40,6 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataTyp
@Slf4j
public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends AbstractMqttV5ClientSparkplugTest {
protected ThreadLocalRandom random = ThreadLocalRandom.current();
/**
* "sparkPlugAttributesMetricNames": ["SN node", "SN device", "Firmware version", "Date version", "Last date update"]
* @throws Exception
@ -47,22 +47,20 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientWithCorrectAccessTokenPublishNCMDReBirth() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32());
List<String> listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
// Shared attribute "Node Control/Rebirth" = true. type = NCMD.
String key = "Node Control/Rebirth";
boolean value = true;
Assert.assertTrue(listKeys.contains(key));
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + key + "\":" + value + "}";
Assert.assertTrue(listKeys.contains(keyNodeRebirth));
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + keyNodeRebirth + "\":" + value + "}";
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(key, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(keyNodeRebirth, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(mqttCallback.getMessageArrivedMetrics().get(0).getBooleanValue());
}
@ -75,13 +73,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
*/
protected void processClientWithCorrectAccessTokenPublishNCMD_BooleanType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
MetricDataType metricDataType = MetricDataType.Boolean;
String metricKey = "MyBoolean";
Object metricValue = nextBoolean();
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
// Boolean <-> String
boolean expectedValue = true;
@ -139,13 +136,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientWithCorrectAccessTokenPublishNCMD_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
MetricDataType metricDataType = UInt32;
String metricKey = "MyLong";
Object metricValue = nextUInt32();
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
// Long <-> String
String valueStr = "123";
@ -191,13 +187,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientWithCorrectAccessTokenPublishNCMD_FloatType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
MetricDataType metricDataType = MetricDataType.Float;
String metricKey = "MyFloat";
Object metricValue = nextFloat(30, 400);
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
// Float <-> String
String valueStr = "123.345";
@ -243,13 +238,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientWithCorrectAccessTokenPublishNCMD_DoubleType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
MetricDataType metricDataType = MetricDataType.Double;
String metricKey = "MyDouble";
Object metricValue = nextDouble();
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
// Double <-> String
String valueStr = "123345456";
@ -295,13 +289,12 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
MetricDataType metricDataType = MetricDataType.String;
String metricKey = "MyString";
Object metricValue = nextString();
connectionWithBirth(listKeys, metricDataType, metricKey, metricValue);
connectionWithNBirth(metricDataType, metricKey, metricValue);
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/NCMD/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
// String <-> Long
long valueLong = 123345456L;
@ -345,4 +338,99 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getStringValue());
}
protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
// Integer <-> Integer
int expectedValueInt = 123456;
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + expectedValueInt + "}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + SparkplugMessageType.DBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(expectedValueInt, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}
protected void processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
// Int <-> String
String valueStr = "123";
long expectedValue = Long.valueOf(valueStr);
String SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueStr + "}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + SparkplugMessageType.DBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
mqttCallback.deleteMessageArrivedMetrics(0);
// Int <-> Boolean
Boolean valueBoolean = true;
expectedValue = 1;
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
mqttCallback.deleteMessageArrivedMetrics(0);
valueBoolean = false;
expectedValue = 0;
SHARED_ATTRIBUTES_PAYLOAD = "{\"" + metricBirthName_Int32 + "\":" + valueBoolean + "}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertEquals(expectedValue, mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}
protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE;
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() {
}));
return actualKeys.get().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0));
}
protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE;
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
await(alias + SparkplugMessageType.DBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() {
}));
return actualKeys.get().size() == 1;
});
Assert.assertEquals(metricBirthName_Int32, actualKeys.get().get(0));
}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.sparkplug.attributes;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.HashSet;
/**
* Created by nickAS21 on 12.01.23
*/
@DaoSqlTest
public class MqttV5ClientSparkplugBAttributesInProfileTest extends AbstractMqttV5ClientSparkplugAttributesTest {
@Before
public void beforeTest() throws Exception {
sparkPlugAttributesMetricNames = new HashSet<>();
sparkPlugAttributesMetricNames.add(metricBirthName_Int32);
beforeSparkplugTest();
}
@After
public void afterTest () throws MqttException {
if (client.isConnected()) {
client.disconnect(); }
}
@Test
public void testClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes();
}
@Test
public void testClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes();
}
}

View File

@ -68,4 +68,14 @@ public class MqttV5ClientSparkplugBAttributesTest extends AbstractMqttV5ClientSp
processClientWithCorrectAccessTokenPublishNCMD_StringType_IfMetricFailedTypeCheck_SendValueOk();
}
@Test
public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute() throws Exception {
processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttribute();
}
@Test
public void testClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk() throws Exception {
processClientDeviceWithCorrectAccessTokenPublishWithBirth_SharedAttributes_LongType_IfMetricFailedTypeCheck_SendValueOk();
}
}

View File

@ -27,22 +27,17 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest;
import org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.OFFLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState.ONLINE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.createMetric;
/**
* Created by nickAS21 on 12.01.23
@ -78,15 +73,13 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
}
protected void processClientWithCorrectAccessTokenWithNDEATHCreatedDevices(int cntDevices) throws Exception {
Set<Device> devices = new HashSet<>();
long ts = calendar.getTimeInMillis();
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts);
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
}
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_ALL(int cntDevices) throws Exception {
Set<Device> devices = new HashSet<>();
long ts = calendar.getTimeInMillis();
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts);
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), ONLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
@ -108,9 +101,8 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
}
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OneDeviceOFFLINE(int cntDevices, int indexDeviceDisconnect) throws Exception {
Set<Device> devices = new HashSet<>();
long ts = calendar.getTimeInMillis();
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts);
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), OFFLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
@ -133,9 +125,8 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
}
protected void processConnectClientWithCorrectAccessTokenWithNDEATH_State_ONLINE_All_Then_OFFLINE_All(int cntDevices) throws Exception {
Set<Device> devices = new HashSet<>();
long ts = calendar.getTimeInMillis();
connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(devices, cntDevices, ts);
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(cntDevices, ts);
TsKvEntry tsKvEntry = new BasicTsKvEntry(ts, new StringDataEntry(SparkplugMessageType.STATE.name(), OFFLINE.name()));
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
@ -162,50 +153,6 @@ public abstract class AbstractMqttV5ClientSparkplugConnectionTest extends Abstra
}
}
private void connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(Set<Device> devices, int cntDevices, long ts) throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
MetricDataType metricDataType = Int32;
String key = "Node Metric int32";
int valueDeviceInt32 = 1024;
SparkplugBProto.Payload.Metric metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
SparkplugBProto.Payload.Builder payloadBirthNode = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getBdSeqNum());
payloadBirthNode.addMetrics(metric);
payloadBirthNode.setTimestamp(ts);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.NBIRTH.name() + "/" + edgeNode,
payloadBirthNode.build().toByteArray(), 0, false);
}
metricDataType = Int32;
key = "Device Metric int32";
valueDeviceInt32 = 4024;
metric = createMetric(valueDeviceInt32, ts, key, metricDataType);
for (int i = 0; i < cntDevices; i++) {
SparkplugBProto.Payload.Builder payloadBirthDevice = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts)
.setSeq(getSeqNum());
String deviceName = deviceId + "_" + i;
payloadBirthDevice.addMetrics(metric);
if (client.isConnected()) {
client.publish(NAMESPACE + "/" + groupId + "/" + SparkplugMessageType.DBIRTH.name() + "/" + edgeNode + "/" + deviceName,
payloadBirthDevice.build().toByteArray(), 0, false);
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;
});
devices.add(device.get());
}
}
Assert.assertEquals(cntDevices, devices.size());
}
private boolean findEqualsKeyValueInKvEntrys(List<TsKvEntry> finalFuture, TsKvEntry tsKvEntry) {
for (TsKvEntry kvEntry : finalFuture) {
if (kvEntry.getKey().equals(tsKvEntry.getKey()) && kvEntry.getValue().equals(tsKvEntry.getValue())) {

View File

@ -15,45 +15,93 @@
*/
package org.thingsboard.server.transport.mqtt.sparkplug.rpc;
import com.nimbusds.jose.util.StandardCharset;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.transport.mqtt.sparkplug.AbstractMqttV5ClientSparkplugTest;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.exception.ThingsboardErrorCode.INVALID_ARGUMENTS;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.DCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
@Slf4j
public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttIntegrationTest {
public abstract class AbstractMqttV5RpcSparkplugTest extends AbstractMqttV5ClientSparkplugTest {
private static final String DEVICE_RESPONSE = "{\"value1\":\"A\",\"value2\":\"B\"}";
private static final String setSparklpugRpcNodeRequest = "{\"method\": \"NCMD\", \"params\": {\"MyNodeMetric05_String\":\"MyNodeMetric05_String_Value\"}}";
private static final String setSparklpugRpcDeviceRequest = "{\"method\": \"DCMD\", \"params\": {\"MyDeviceMetric05_String\":{\"MyDeviceMetric05_String_Value\"}}";
private static final int metricBirthValue_Int32 = 123456;
private static final String sparkplugRpcRequest = "{\"metricName\":\"" + metricBirthName_Int32 + "\",\"value\":" + metricBirthValue_Int32 + "}";
protected class MqttV5TestRpcCallback extends MqttV5TestCallback {
@Test
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
String expected = "{\"result\":\"Success: " + SparkplugMessageType.NCMD.name() + "\"}";
String actual = sendRPCSparkplug(NCMD.name(), sparkplugRpcRequest, savedGateway);
await(alias + SparkplugMessageType.NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(expected, actual);
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}
private final MqttV5TestClient client;
@Test
public void processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
String expected = "{\"result\":\"Success: " + DCMD.name() + "\"}";
String actual = sendRPCSparkplug(DCMD.name() , sparkplugRpcRequest, devices.get(0));
await(alias + NCMD.name())
.atMost(40, TimeUnit.SECONDS)
.until(() -> {
return mqttCallback.getMessageArrivedMetrics().size() == 1;
});
Assert.assertEquals(expected, actual);
Assert.assertEquals(metricBirthName_Int32, mqttCallback.getMessageArrivedMetrics().get(0).getName());
Assert.assertTrue(metricBirthValue_Int32 == mqttCallback.getMessageArrivedMetrics().get(0).getIntValue());
}
public MqttV5TestRpcCallback(MqttV5TestClient client, String awaitSubTopic) {
super(awaitSubTopic);
this.client = client;
}
@Test
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
String invalidateTypeMessageName = "RCMD";
String expected = "{\"result\":\"" + INVALID_ARGUMENTS + "\",\"error\":\"Failed to convert device RPC command to MQTT msg: " +
invalidateTypeMessageName + "{\\\"metricName\\\":\\\"" + metricBirthName_Int32 + "\\\",\\\"value\\\":" + metricBirthValue_Int32 + "}\"}";
String actual = sendRPCSparkplug(invalidateTypeMessageName, sparkplugRpcRequest, savedGateway);
Assert.assertEquals(expected, actual);
}
@Override
protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
String responseTopic = requestTopic.replace("request", "response");
try {
client.publish(responseTopic, DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8));
} catch (MqttException e) {
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e);
}
subscribeLatch.countDown();
}
}
@Test
public void processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
client.subscribeAndWait(NAMESPACE + "/" + groupId + "/" + NCMD.name() + "/" + edgeNode + "/#", MqttQoS.AT_MOST_ONCE);
String metricNameBad = metricBirthName_Int32 + "_Bad";
String sparkplugRpcRequestBad = "{\"metricName\":\"" + metricNameBad + "\",\"value\":" + metricBirthValue_Int32 + "}";
String expected = "{\"result\":\"BAD_REQUEST_PARAMS\",\"error\":\"Failed send To Node Rpc Request: " +
DCMD.name() + ". This node does not have a metricName: [" + metricNameBad + "]\"}";
String actual = sendRPCSparkplug(DCMD.name(), sparkplugRpcRequestBad, savedGateway);
Assert.assertEquals(expected, actual);
}
private String sendRPCSparkplug(String nameTypeMessage, String keyValue, Device device) throws Exception {
String setRpcRequest = "{\"method\": \"" + nameTypeMessage + "\", \"params\": " + keyValue + "}";
return doPostAsync("/api/plugins/rpc/twoway/" + device.getId().getId(), setRpcRequest, String.class, status().isOk());
}
}

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.sparkplug.rpc;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
@DaoSqlTest
@Slf4j
public class MqttV5RpcSparkplugTest extends AbstractMqttV5RpcSparkplugTest {
@Before
public void beforeTest() throws Exception {
beforeSparkplugTest();
}
@After
public void afterTest() throws MqttException {
if (client.isConnected()) {
client.disconnect();
}
}
@Test
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_Success();
}
@Test
public void testClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success() throws Exception {
processClientDeviceWithCorrectAccessTokenPublish_TwoWayRpc_Success();
}
@Test
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS() throws Exception {
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS();
}
@Test
public void testClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InBirthNotHaveMetric_BAD_REQUEST_PARAMS() throws Exception {
processClientNodeWithCorrectAccessTokenPublish_TwoWayRpc_InvalidTypeMessage_INVALID_ARGUMENTS();
}
}

View File

@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Int32;
/**
* Created by nickAS21 on 12.01.23
@ -39,8 +38,7 @@ public abstract class AbstractMqttV5ClientSparkplugTelemetryTest extends Abstrac
protected void processClientWithCorrectAccessTokenPublishNBIRTH() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
List<String> listKeys = new ArrayList<>();
connectionWithBirth(listKeys, Int32, "Node Metric int32", nextInt32());
List<String> listKeys = connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
Assert.assertTrue("Connection node is failed", client.isConnected());
AtomicReference<ListenableFuture<List<TsKvEntry>>> finalFuture = new AtomicReference<>();
await(alias + SparkplugMessageType.NBIRTH.name())

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.