diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 058430da4b..c930fd460b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -402,11 +402,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement break; case NBIRTH: sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList()); - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); break; case NCMD: case NDATA: - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); break; case NRECORD: // TODO @@ -423,7 +423,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case DBIRTH: case DCMD: case DDATA: - sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); + sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic); break; /** * TODO @@ -1112,7 +1112,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (sparkplugTopicNode != null) { SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes()); sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode); - sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode, + sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode, deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode); } else { log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName()); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 13ffb960a5..291da6564a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -557,7 +557,7 @@ public abstract class AbstractGatewaySessionHandler { } } - private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { + protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) { transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg)); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index fef4cd112a..73d9f1f19f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -27,12 +27,15 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import lombok.extern.slf4j.Slf4j; import org.eclipse.leshan.core.ResponseCode; +import org.springframework.util.CollectionUtils; +import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; +import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; @@ -44,6 +47,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -96,11 +100,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void onTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { + public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { checkDeviceName(deviceName); ListenableFuture contextListenableFuture = topic.isNode() ? Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName); - List msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); + if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { try { // add Msg Telemetry: key STATE type: String value: ONLINE ts: sparkplugBProto.getTimestamp() @@ -111,22 +115,25 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { log.error("Failed add Metrics. MessageType *BIRTH.", e); } } - onDeviceTelemetryProto(contextListenableFuture, msgId, msgs, deviceName); + List attributesMsgList = convertToPostAttributes(sparkplugBProto, deviceName); + onDeviceAttributesProto(contextListenableFuture, msgId, attributesMsgList, deviceName); + List postTelemetryMsgList = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); + onDeviceTelemetryProto(contextListenableFuture, msgId, postTelemetryMsgList, deviceName); } public void onDeviceTelemetryProto(ListenableFuture contextListenableFuture, - int msgId, List msgs, String deviceName) throws AdaptorException { + int msgId, List postTelemetryMsgList, String deviceName) throws AdaptorException { try { int finalMsgId = msgId; - for (TransportProtos.PostTelemetryMsg msg : msgs) { + postTelemetryMsgList.forEach(telemetryMsg -> { Futures.addCallback(contextListenableFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { try { - processPostTelemetryMsg(deviceCtx, msg, deviceName, finalMsgId); + processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, finalMsgId); } catch (Throwable e) { - log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, msg, e); + log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e); channel.close(); } } @@ -136,6 +143,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t); } }, context.getExecutor()); + }); + } catch (RuntimeException e) { + throw new AdaptorException(e); + } + } + + private void onDeviceAttributesProto(ListenableFuture contextListenableFuture, int msgId, + List attributesMsgList, String deviceName) throws AdaptorException { + try { + if (!CollectionUtils.isEmpty(attributesMsgList)) { + attributesMsgList.forEach(attributesMsg -> { + Futures.addCallback(contextListenableFuture, + new FutureCallback<>() { + @Override + public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { + TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); + try { + TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray()); + processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); + } catch (Throwable e) { + log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); + } + } + + @Override + public void onFailure(Throwable t) { + log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t); + } + }, context.getExecutor()); + }); + } else { + log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId()); } } catch (RuntimeException e) { throw new AdaptorException(e); @@ -180,13 +219,17 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { private List convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, String topicTypeName) throws AdaptorException { try { List msgs = new ArrayList<>(); + Set attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx + .getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { - long ts = protoMetric.getTimestamp(); - String key = "bdSeq".equals(protoMetric.getName()) ? - topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); - Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); - if (keyValueProtoOpt.isPresent()) { - msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts)); + if (!attributesMetricNames.contains(protoMetric.getName())) { + long ts = protoMetric.getTimestamp(); + String key = "bdSeq".equals(protoMetric.getName()) ? + topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); + if (keyValueProtoOpt.isPresent()) { + msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts)); + } } } @@ -204,6 +247,39 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } + private List convertToPostAttributes(SparkplugBProto.Payload sparkplugBProto, String deviceName) throws AdaptorException { + try { + List msgs = new ArrayList<>(); + Set attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx + .getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames(); + for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { + if (attributesMetricNames.contains(protoMetric.getName())) { + TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); + Optional msgOpt = getPostAttributeMsg(protoMetric); + if (msgOpt.isPresent()) { + deviceAttributesMsgBuilder.setDeviceName(deviceName); + deviceAttributesMsgBuilder.setMsg(msgOpt.get()); + msgs.add(deviceAttributesMsgBuilder.build()); + } + } + } + return msgs; + } catch (IllegalStateException | JsonSyntaxException | ThingsboardException e) { + log.error("Failed to decode post telemetry request", e); + throw new AdaptorException(e); + } + } + + private Optional getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric); + if (keyValueProtoOpt.isPresent()) { + TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder(); + builder.addKv(keyValueProtoOpt.get()); + return Optional.of(builder.build()); + } + return Optional.empty(); + } + public SparkplugTopic getSparkplugTopicNode() { return this.sparkplugTopicNode; } @@ -238,7 +314,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); } - protected void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { + protected void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo); } @@ -247,7 +323,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } protected void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) { - parent.sendSuccessRpcResponse(sessionInfo, requestId,result, successMsg); + parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg); }