From a405999a88b36e7984619df4d5985a4373e63fab Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 25 Apr 2023 19:03:17 +0300 Subject: [PATCH 1/3] Use shared attributes for sparkplug devices --- common/cluster-api/src/main/proto/queue.proto | 1 + .../mqtt/session/AbstractGatewaySessionHandler.java | 2 +- .../mqtt/session/SparkplugNodeSessionHandler.java | 3 ++- .../common/transport/adaptor/ProtoConverter.java | 10 ++++++---- .../transport/service/DefaultTransportService.java | 3 +++ 5 files changed, 13 insertions(+), 6 deletions(-) diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 5198c6fc8d..80f44e59be 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -148,6 +148,7 @@ message PostTelemetryMsg { message PostAttributeMsg { repeated KeyValueProto kv = 1; + bool shared = 2; } message GetAttributeRequestMsg { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index ab0b7be5db..95603e751c 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -535,7 +535,7 @@ public abstract class AbstractGatewaySessionHandler kvList = proto.getKvList(); - if (!CollectionUtils.isEmpty(kvList)) { + public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(TransportProtos.PostAttributeMsg msg) throws IllegalArgumentException, InvalidProtocolBufferException { + if (!CollectionUtils.isEmpty(msg.getKvList())) { + byte[] bytes = msg.toByteArray(); + TransportProtos.PostAttributeMsg proto = TransportProtos.PostAttributeMsg.parseFrom(bytes); + List kvList = proto.getKvList(); List keyValueProtos = validateKeyValueProtos(kvList); TransportProtos.PostAttributeMsg.Builder result = TransportProtos.PostAttributeMsg.newBuilder(); result.addAllKv(keyValueProtos); + result.setShared(msg.getShared()); return result.build(); } else { throw new IllegalArgumentException("KeyValue list is empty!"); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 883522881f..b495448d90 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -593,6 +593,9 @@ public class DefaultTransportService implements TransportService { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); + if (msg.getShared()) { + metaData.putValue(DataConstants.SCOPE, DataConstants.SHARED_SCOPE); + } metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false"); CustomerId customerId = getCustomerId(sessionInfo); sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, From 8e20fd88007a662b53bcf3dc31e900cae966d2e5 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 27 Apr 2023 17:08:34 +0300 Subject: [PATCH 2/3] Metric wildcards and default attributes --- .../session/SparkplugNodeSessionHandler.java | 16 ++++++++++++++-- ui-ngx/src/app/shared/models/device.models.ts | 1 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 9cf3495273..6b107683e0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -233,7 +233,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - if (attributesMetricNames == null || !attributesMetricNames.contains(protoMetric.getName())) { + if (attributesMetricNames == null || !matches(attributesMetricNames, protoMetric)) { long ts = protoMetric.getTimestamp(); String key = "bdSeq".equals(protoMetric.getName()) ? topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); @@ -264,7 +264,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - if (attributesMetricNames.contains(protoMetric.getName())) { + if (matches(attributesMetricNames, protoMetric)) { TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); Optional msgOpt = getPostAttributeMsg(protoMetric); if (msgOpt.isPresent()) { @@ -281,6 +281,18 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler attributesMetricNames, SparkplugBProto.Payload.Metric protoMetric) { + String metricName = protoMetric.getName(); + for (String attributeMetricFilter : attributesMetricNames) { + if (metricName.equals(attributeMetricFilter) || + (attributeMetricFilter.endsWith("*") && metricName.startsWith( + attributeMetricFilter.substring(0, attributeMetricFilter.length() - 1)))) { + return true; + } + } + return false; + } + private Optional getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric); if (keyValueProtoOpt.isPresent()) { diff --git a/ui-ngx/src/app/shared/models/device.models.ts b/ui-ngx/src/app/shared/models/device.models.ts index 1cf06e9ec6..b975d511ff 100644 --- a/ui-ngx/src/app/shared/models/device.models.ts +++ b/ui-ngx/src/app/shared/models/device.models.ts @@ -368,6 +368,7 @@ export function createDeviceProfileTransportConfiguration(type: DeviceTransportT deviceAttributesTopic: 'v1/devices/me/attributes', deviceAttributesSubscribeTopic: 'v1/devices/me/attributes', sparkplug: false, + sparkplugAttributesMetricNames: ['Node Control/*', 'Device Control/*', 'Properties/*'], sendAckOnValidationException: false, transportPayloadTypeConfiguration: { transportPayloadType: TransportPayloadType.JSON, From df0bd28ae543a23019fd5ea62559ff11283b4ae9 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 1 May 2023 13:37:17 +0300 Subject: [PATCH 3/3] Fix Sparkplug tests --- .../AbstractMqttV5ClientSparkplugAttributesTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 4e05e44668..31d6233a69 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; +import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; @@ -409,7 +409,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); - String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + SHARED_SCOPE; AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.NBIRTH.name()) .atMost(40, TimeUnit.SECONDS) @@ -424,7 +424,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { long ts = calendar.getTimeInMillis(); List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); - String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + SHARED_SCOPE; AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.DBIRTH.name()) .atMost(40, TimeUnit.SECONDS)