Fix server-side RPC requests handling
This commit is contained in:
parent
c905f20224
commit
0f5fd481fc
@ -1253,35 +1253,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
log.trace("[{}] Received RPC command to device", sessionId);
|
log.trace("[{}] Received RPC command to device", sessionId);
|
||||||
try {
|
try {
|
||||||
if (sparkplugSessionHandler != null) {
|
if (sparkplugSessionHandler != null) {
|
||||||
/**
|
handleToSparkplugDeviceRpcRequest(rpcRequest);
|
||||||
* NCMD {"metricName":"MyNodeMetric05_String","value":"MyNodeMetric05_String_Value"}
|
|
||||||
* NCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444}
|
|
||||||
* NCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333}
|
|
||||||
* NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222}
|
|
||||||
* NCMD {"metricName":"Node Control/Rebirth","value":false}
|
|
||||||
* NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]}
|
|
||||||
*/
|
|
||||||
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
|
|
||||||
if (messageType == null) {
|
|
||||||
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
|
||||||
ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
|
|
||||||
header.setMessageType(messageType.name());
|
|
||||||
TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
|
|
||||||
if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
|
|
||||||
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
|
|
||||||
messageType);
|
|
||||||
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
|
|
||||||
sparkplugTopic.toString(),
|
|
||||||
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
|
|
||||||
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
|
|
||||||
} else {
|
|
||||||
sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
|
||||||
ThingsboardErrorCode.BAD_REQUEST_PARAMS, "Failed send To Node Rpc Request: " +
|
|
||||||
rpcRequest.getMethodName() + ". This node does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
|
String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
|
||||||
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
|
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
|
||||||
@ -1293,10 +1265,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
||||||
ThingsboardErrorCode.INVALID_ARGUMENTS,
|
ThingsboardErrorCode.INVALID_ARGUMENTS,
|
||||||
"Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams());
|
"Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
|
private void handleToSparkplugDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws ThingsboardException {
|
||||||
|
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
|
||||||
|
SparkplugRpcRequestHeader header;
|
||||||
|
if (StringUtils.isNotEmpty(rpcRequest.getParams())) {
|
||||||
|
header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
|
||||||
|
} else {
|
||||||
|
header = new SparkplugRpcRequestHeader();
|
||||||
|
}
|
||||||
|
header.setMessageType(messageType.name());
|
||||||
|
TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
|
||||||
|
if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
|
||||||
|
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
|
||||||
|
messageType);
|
||||||
|
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
|
||||||
|
sparkplugTopic.toString(),
|
||||||
|
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
|
||||||
|
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
|
||||||
|
} else {
|
||||||
|
sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
|
||||||
|
ThingsboardErrorCode.BAD_REQUEST_PARAMS, "Failed send To Node Rpc Request: " +
|
||||||
|
rpcRequest.getMethodName() + ". This node does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
|
||||||
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
|
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
|
||||||
if (isAckExpected(payload)) {
|
if (isAckExpected(payload)) {
|
||||||
rpcAwaitingAck.put(msgId, rpcRequest);
|
rpcAwaitingAck.put(msgId, rpcRequest);
|
||||||
@ -1315,7 +1311,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
} else if (rpcRequest.getPersisted()) {
|
} else if (rpcRequest.getPersisted()) {
|
||||||
transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
|
transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
|
||||||
}
|
}
|
||||||
this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName());
|
if (sparkplugSessionHandler != null) {
|
||||||
|
this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName());
|
log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName());
|
||||||
this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
|
this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user