diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 4e05e44668..31d6233a69 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE; +import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD; import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE; @@ -409,7 +409,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { clientWithCorrectNodeAccessTokenWithNDEATH(); connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32()); - String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + SHARED_SCOPE; AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.NBIRTH.name()) .atMost(40, TimeUnit.SECONDS) @@ -424,7 +424,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception { long ts = calendar.getTimeInMillis(); List devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts); - String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE; + String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + SHARED_SCOPE; AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.DBIRTH.name()) .atMost(40, TimeUnit.SECONDS) diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 5198c6fc8d..80f44e59be 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -148,6 +148,7 @@ message PostTelemetryMsg { message PostAttributeMsg { repeated KeyValueProto kv = 1; + bool shared = 2; } message GetAttributeRequestMsg { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index ab0b7be5db..95603e751c 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -535,7 +535,7 @@ public abstract class AbstractGatewaySessionHandler msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - if (attributesMetricNames == null || !attributesMetricNames.contains(protoMetric.getName())) { + if (attributesMetricNames == null || !matches(attributesMetricNames, protoMetric)) { long ts = protoMetric.getTimestamp(); String key = "bdSeq".equals(protoMetric.getName()) ? topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); @@ -264,7 +264,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - if (attributesMetricNames.contains(protoMetric.getName())) { + if (matches(attributesMetricNames, protoMetric)) { TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); Optional msgOpt = getPostAttributeMsg(protoMetric); if (msgOpt.isPresent()) { @@ -281,11 +281,24 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler attributesMetricNames, SparkplugBProto.Payload.Metric protoMetric) { + String metricName = protoMetric.getName(); + for (String attributeMetricFilter : attributesMetricNames) { + if (metricName.equals(attributeMetricFilter) || + (attributeMetricFilter.endsWith("*") && metricName.startsWith( + attributeMetricFilter.substring(0, attributeMetricFilter.length() - 1)))) { + return true; + } + } + return false; + } + private Optional getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric); if (keyValueProtoOpt.isPresent()) { TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder(); builder.addKv(keyValueProtoOpt.get()); + builder.setShared(true); return Optional.of(builder.build()); } return Optional.empty(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java index 573916964a..2ce6260fa2 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java @@ -67,13 +67,15 @@ public class ProtoConverter { } } - public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(byte[] bytes) throws IllegalArgumentException, InvalidProtocolBufferException { - TransportProtos.PostAttributeMsg proto = TransportProtos.PostAttributeMsg.parseFrom(bytes); - List kvList = proto.getKvList(); - if (!CollectionUtils.isEmpty(kvList)) { + public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(TransportProtos.PostAttributeMsg msg) throws IllegalArgumentException, InvalidProtocolBufferException { + if (!CollectionUtils.isEmpty(msg.getKvList())) { + byte[] bytes = msg.toByteArray(); + TransportProtos.PostAttributeMsg proto = TransportProtos.PostAttributeMsg.parseFrom(bytes); + List kvList = proto.getKvList(); List keyValueProtos = validateKeyValueProtos(kvList); TransportProtos.PostAttributeMsg.Builder result = TransportProtos.PostAttributeMsg.newBuilder(); result.addAllKv(keyValueProtos); + result.setShared(msg.getShared()); return result.build(); } else { throw new IllegalArgumentException("KeyValue list is empty!"); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 883522881f..b495448d90 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -593,6 +593,9 @@ public class DefaultTransportService implements TransportService { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); + if (msg.getShared()) { + metaData.putValue(DataConstants.SCOPE, DataConstants.SHARED_SCOPE); + } metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false"); CustomerId customerId = getCustomerId(sessionInfo); sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, diff --git a/ui-ngx/src/app/shared/models/device.models.ts b/ui-ngx/src/app/shared/models/device.models.ts index 94231a26f6..f7802b3a8b 100644 --- a/ui-ngx/src/app/shared/models/device.models.ts +++ b/ui-ngx/src/app/shared/models/device.models.ts @@ -371,6 +371,7 @@ export const createDeviceProfileTransportConfiguration = (type: DeviceTransportT deviceAttributesTopic: 'v1/devices/me/attributes', deviceAttributesSubscribeTopic: 'v1/devices/me/attributes', sparkplug: false, + sparkplugAttributesMetricNames: ['Node Control/*', 'Device Control/*', 'Properties/*'], sendAckOnValidationException: false, transportPayloadTypeConfiguration: { transportPayloadType: TransportPayloadType.JSON,