sparkplug: rpc device

This commit is contained in:
nickAS21 2023-02-15 14:19:10 +02:00
parent d7050074bc
commit 524952d264
4 changed files with 57 additions and 46 deletions

View File

@ -1271,13 +1271,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
* NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} * NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222}
* NCMD {"metricName":"Node Control/Rebirth","value":false} * NCMD {"metricName":"Node Control/Rebirth","value":false}
* NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} * NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]}
* NCMD {"metricName":"Node Control/Rebirth", "value":false}
* without backspace
*/ */
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
if (messageType == null) { if (messageType == null) {
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
ResponseCode.METHOD_NOT_ALLOWED, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams());
return; return;
} }
SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
@ -1289,30 +1287,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(), sparkplugTopic.toString(),
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest)); .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
} else {
sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
ThingsboardErrorCode.BAD_REQUEST_PARAMS, "Failed send To Node Rpc Request: " +
rpcRequest.getMethodName() + ". This node does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
} }
} else { } else {
String baseTopic = rpcSubTopicType.getRpcRequestTopicBase(); String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType); MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic) adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic)
.ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest)); .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo()));
} }
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}] Failed to convert device RPC command to Sparkplug MQTT msg", sessionId, e); log.trace("[{}] Failed to convert device RPC command to MQTT msg", sessionId, e);
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(),
ResponseCode.METHOD_NOT_ALLOWED, ThingsboardErrorCode.INVALID_ARGUMENTS,
"Failed to convert device RPC command to Sparkplug MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams()); "Failed to convert device RPC command to MQTT msg: " + rpcRequest.getMethodName() + rpcRequest.getParams());
} }
} }
public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { public void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId(); int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
if (isAckExpected(payload)) { if (isAckExpected(payload)) {
rpcAwaitingAck.put(msgId, rpcRequest); rpcAwaitingAck.put(msgId, rpcRequest);
context.getScheduler().schedule(() -> { context.getScheduler().schedule(() -> {
TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId); TransportProtos.ToDeviceRpcRequestMsg msg = rpcAwaitingAck.remove(msgId);
if (msg != null) { if (msg != null) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY); transportService.process(sessionInfo, rpcRequest, RpcStatus.TIMEOUT, TransportServiceCallback.EMPTY);
} }
}, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS); }, Math.max(0, Math.min(deviceSessionCtx.getContext().getTimeout(), rpcRequest.getExpirationTime() - System.currentTimeMillis())), TimeUnit.MILLISECONDS);
} }
@ -1320,15 +1322,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
cf.addListener(result -> { cf.addListener(result -> {
if (result.cause() == null) { if (result.cause() == null) {
if (!isAckExpected(payload)) { if (!isAckExpected(payload)) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); transportService.process(sessionInfo, rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
} else if (rpcRequest.getPersisted()) { } else if (rpcRequest.getPersisted()) {
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY); transportService.process(sessionInfo, rpcRequest, RpcStatus.SENT, TransportServiceCallback.EMPTY);
} }
this.sendSuccessRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName()); this.sendSuccessRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.CONTENT, "Success: " + rpcRequest.getMethodName());
} else { } else {
log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName()); log.trace("[{}] Failed send To Device Rpc Request [{}]", sessionId, rpcRequest.getMethodName());
this.sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
ResponseCode.METHOD_NOT_ALLOWED, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName()); ThingsboardErrorCode.INVALID_ARGUMENTS, " Failed send To Device Rpc Request: " + rpcRequest.getMethodName());
} }
}); });
} }
@ -1370,8 +1372,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.close(); ctx.close();
} }
public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String errorMsg) { public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {
String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.getName()).error(errorMsg).build()); String payload = JacksonUtil.toString(SparkplugRpcResponseBody.builder().result(result.name()).error(errorMsg).build());
TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build(); TransportProtos.ToDeviceRpcResponseMsg msg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder().setRequestId(requestId).setError(payload).build();
transportService.process(sessionInfo, msg, null); transportService.process(sessionInfo, msg, null);
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.exception.ThingsboardException;
@ -23,11 +24,15 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import java.util.Date;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetricUtil.getTsKvProto;
@Slf4j @Slf4j
public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext<SparkplugNodeSessionHandler> { public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionContext<SparkplugNodeSessionHandler> {
@ -60,34 +65,38 @@ public class SparkplugDeviceSessionContext extends AbstractGatewayDeviceSessionC
log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId); log.trace("[{}] Received RPC Request notification to sparkplug device", sessionId);
try { try {
/** /**
* NCMD {"metricName":"MyNodeMetric05_String","value":"MyNodeMetric05_String_Value"} * DCMD {"metricName":"MyDeviceMetricText","value":"MyNodeMetric05_String_Value"}
* NCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444} * DCMD {"metricName":"MyNodeMetric02_LongInt64","value":2814119464032075444}
* NCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333} * DCMD {"metricName":"MyNodeMetric03_Double","value":6336935578763180333}
* NCMD {"metricName":"MyNodeMetric04_Float","value":413.18222} * DCMD {"metricName":"MyNodeMetric04_Float","value":413.18222}
* NCMD {"metricName":"Node Control/Rebirth","value":false} * DCMD {"metricName":"Node Control/Rebirth","value":false}
* NCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]} * DCMD {"metricName":"MyNodeMetric06_Json_Bytes", "value":[40,47,-49]}
* NCMD {"metricName":"Node Control/Rebirth", "value":false}
* without backspace
*/ */
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
// if (messageType == null) { if (messageType == null) {
// parent.sendErrorRpcResponse(parent.deviceSessionCtx., rpcRequest.getRequestId(), parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
// ResponseCode.METHOD_NOT_ALLOWED, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams()); ThingsboardErrorCode.INVALID_ARGUMENTS, "Unsupported SparkplugMessageType: " + rpcRequest.getMethodName() + rpcRequest.getParams());
// return; return;
// } }
// SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class); SparkplugRpcRequestHeader header = JacksonUtil.fromString(rpcRequest.getParams(), SparkplugRpcRequestHeader.class);
// header.setMessageType(messageType.name()); header.setMessageType(messageType.name());
// TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime()); TransportProtos.TsKvProto tsKvProto = getTsKvProto(header.getMetricName(), header.getValue(), new Date().getTime());
// if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
// SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), SparkplugTopic sparkplugTopic = new SparkplugTopic(parent.getSparkplugTopicNode(),
// messageType); messageType, deviceInfo.getDeviceName());
// sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, parent.createSparkplugMqttPublishMsg(tsKvProto,
// sparkplugTopic.toString(), sparkplugTopic.toString(),
// sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
// .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest)); .ifPresent(payload -> parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo));
// } } else {
parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " +
rpcRequest.getMethodName() + ". This device does not have a metricName: [" + tsKvProto.getKv().getKey() + "]");
}
} catch (ThingsboardException e) { } catch (ThingsboardException e) {
new ThingsboardException(e.getMessage(), ThingsboardErrorCode.INVALID_ARGUMENTS); parent.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(),
ThingsboardErrorCode.BAD_REQUEST_PARAMS, " Failed send To Device Rpc Request: " +
rpcRequest.getMethodName() + ". " + e.getMessage());
} }
} }

View File

@ -238,11 +238,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
} }
protected void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { protected void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
parent.sendToDeviceRpcRequest(payload, rpcRequest); parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo);
} }
protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String errorMsg) { protected void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {
parent.sendErrorRpcResponse(sessionInfo, requestId, result, errorMsg); parent.sendErrorRpcResponse(sessionInfo, requestId, result, errorMsg);
} }

View File

@ -46,7 +46,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
protected volatile DeviceProfile deviceProfile; protected volatile DeviceProfile deviceProfile;
@Getter @Getter
@Setter @Setter
private volatile TransportProtos.SessionInfoProto sessionInfo; protected volatile TransportProtos.SessionInfoProto sessionInfo;
@Setter @Setter
private volatile boolean connected; private volatile boolean connected;