diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 910d257640..058430da4b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1271,13 +1271,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement * NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} * NCMD {"metricName":"Node Control/Rebirth","value":false} * NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} - * NCMD {"metricName":"Node Control/Rebirth", "value":false} - * without backspace */ SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); if (messageType == null) { this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), - ResponseCode.METHOD_NOT_ALLOWED, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); + ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); return; } SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); @@ -1289,30 +1287,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, sparkplugTopic.toString(), sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) - .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest)); + .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo())); + } else { + sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, "Failed send To Node Rpc Request: " + + rpcRequest.getMethodName() + ". This node does not have a metricName: [" + tsKvProto.getKv().getKey() + "]"); } } else { String baseTopic = rpcSubTopicType.getRpcRequestTopicBase(); MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType); adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic) - .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest)); + .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo())); } } catch (Exception e) { - log.trace("[{}] Failed to convert device RPC command to Sparkplug MQTT msg", sessionId, e); + log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e); this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), - ResponseCode.METHOD_NOT_ALLOWED, - "Failed to convert device RPC command to Sparkplug MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams()); + ThingsboardErrorCode.INVALID_ARGUMENTS, + "Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams()); } } - public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { + public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); if (isAckExpected(payload)) { rpcAwaitingAck.put(msgId, rpcRequest); context.getScheduler().schedule(() -> { TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId); if (msg != null) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); + transportService.process(sessionInfo, rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); } }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); } @@ -1320,15 +1322,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement cf.addListener(result -> { if (result.cause() == null) { if (!isAckExpected(payload)) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } else if (rpcRequest.getPersisted()) { - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); + transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); } - this.sendSuccessRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); + this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); } else { log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName()); - this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), - ResponseCode.METHOD_NOT_ALLOWED, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName()); + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName()); } }); } @@ -1370,8 +1372,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement ctx.close(); } - public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String errorMsg) { - String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.getName()).error(errorMsg).build()); + public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) { + String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.name()).error(errorMsg).build()); TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build(); transportService.process(sessionInfo, msg, null); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index e4db1f04f0..7e8e7421e4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.mqtt.session; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; @@ -23,11 +24,15 @@ import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; +import java.util.Date; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto; + @Slf4j public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext { @@ -60,34 +65,38 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId); try { /** - * NCMD {"metricName":"MyNodeMetric05_String","value":"MyNodeMetric05_String_Value"} - * NCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444} - * NCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333} - * NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} - * NCMD {"metricName":"Node Control/Rebirth","value":false} - * NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} - * NCMD {"metricName":"Node Control/Rebirth", "value":false} - * without backspace + * DCMD {"metricName":"MyDeviceMetricText","value":"MyNodeMetric05_String_Value"} + * DCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444} + * DCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333} + * DCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} + * DCMD {"metricName":"Node Control/Rebirth","value":false} + * DCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} */ SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); -// if (messageType == null) { -// parent.sendErrorRpcResponse(parent.deviceSessionCtx., rpcRequest.getRequestId(), -// ResponseCode.METHOD_NOT_ALLOWED, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); -// return; -// } -// SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); -// header.setMessageType(messageType.name()); -// TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); -// if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { -// SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), -// messageType); -// sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, -// sparkplugTopic.toString(), -// sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) -// .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest)); -// } + if (messageType == null) { + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); + return; + } + SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); + header.setMessageType(messageType.name()); + TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); + if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(), + messageType, deviceInfo.getDeviceName()); + parent.createSparkplugMqttPublishMsg(tsKvProto, + sparkplugTopic.toString(), + getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) + .ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo)); + } else { + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " + + rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]"); + } } catch (ThingsboardException e) { - new ThingsboardException(e.getMessage(), ThingsboardErrorCode.INVALID_ARGUMENTS); + parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), + ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " + + rpcRequest.getMethodName() + ". " + e.getMessage()); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index b55170e70b..fef4cd112a 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -238,11 +238,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); } - protected void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { - parent.sendToDeviceRpcRequest(payload, rpcRequest); + protected void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { + parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo); } - protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String errorMsg) { + protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) { parent.sendErrorRpcResponse(sessionInfo, requestId, result, errorMsg); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 1ec98aac9f..f5fa34e218 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -46,7 +46,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext { protected volatile DeviceProfile deviceProfile; @Getter @Setter - private volatile TransportProtos.SessionInfoProto sessionInfo; + protected volatile TransportProtos.SessionInfoProto sessionInfo; @Setter private volatile boolean connected;