sparkplug: attribute device/node from deviceProfile

This commit is contained in:
nickAS21 2023-02-15 17:31:16 +02:00
parent 524952d264
commit f4af14ed42
3 changed files with 96 additions and 20 deletions

View File

@ -402,11 +402,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
break;
case NBIRTH:
sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList());
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
break;
case NCMD:
case NDATA:
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
break;
case NRECORD:
// TODO
@ -423,7 +423,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case DBIRTH:
case DCMD:
case DDATA:
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
sparkplugSessionHandler.onAttributesTelemetryProto(msgId, sparkplugBProtoDevice, deviceName, sparkplugTopic);
break;
/**
* TODO
@ -1112,7 +1112,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (sparkplugTopicNode != null) {
SparkplugBProto.Payload sparkplugBProtoNode = SparkplugBProto.Payload.parseFrom(connectMessage.payload().willMessageInBytes());
sparkplugSessionHandler = new SparkplugNodeSessionHandler(this, deviceSessionCtx, sessionId, sparkplugTopicNode);
sparkplugSessionHandler.onTelemetryProto(0, sparkplugBProtoNode,
sparkplugSessionHandler.onAttributesTelemetryProto(0, sparkplugBProtoNode,
deviceSessionCtx.getDeviceInfo().getDeviceName(), sparkplugTopicNode);
} else {
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect: sparkplugTopicName without SparkplugMessageType.NDEATH.", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName());

View File

@ -557,7 +557,7 @@ public abstract class AbstractGatewaySessionHandler {
}
}
private void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
protected void processPostAttributesMsg(MqttDeviceAwareSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
}

View File

@ -27,12 +27,15 @@ import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode;
import org.springframework.util.CollectionUtils;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
@ -44,6 +47,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@ -96,11 +100,11 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
public void onTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
public void onAttributesTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
checkDeviceName(deviceName);
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = topic.isNode() ?
Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName);
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
try {
// add Msg Telemetry: key STATE type: String value: ONLINE ts: sparkplugBProto.getTimestamp()
@ -111,22 +115,25 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
log.error("Failed add Metrics. MessageType *BIRTH.", e);
}
}
onDeviceTelemetryProto(contextListenableFuture, msgId, msgs, deviceName);
List<TransportApiProtos.AttributesMsg> attributesMsgList = convertToPostAttributes(sparkplugBProto, deviceName);
onDeviceAttributesProto(contextListenableFuture, msgId, attributesMsgList, deviceName);
List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
onDeviceTelemetryProto(contextListenableFuture, msgId, postTelemetryMsgList, deviceName);
}
public void onDeviceTelemetryProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture,
int msgId, List<TransportProtos.PostTelemetryMsg> msgs, String deviceName) throws AdaptorException {
int msgId, List<TransportProtos.PostTelemetryMsg> postTelemetryMsgList, String deviceName) throws AdaptorException {
try {
int finalMsgId = msgId;
for (TransportProtos.PostTelemetryMsg msg : msgs) {
postTelemetryMsgList.forEach(telemetryMsg -> {
Futures.addCallback(contextListenableFuture,
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
try {
processPostTelemetryMsg(deviceCtx, msg, deviceName, finalMsgId);
processPostTelemetryMsg(deviceCtx, telemetryMsg, deviceName, finalMsgId);
} catch (Throwable e) {
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, msg, e);
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, telemetryMsg, e);
channel.close();
}
}
@ -136,6 +143,38 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
});
} catch (RuntimeException e) {
throw new AdaptorException(e);
}
}
private void onDeviceAttributesProto(ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture, int msgId,
List<TransportApiProtos.AttributesMsg> attributesMsgList, String deviceName) throws AdaptorException {
try {
if (!CollectionUtils.isEmpty(attributesMsgList)) {
attributesMsgList.forEach(attributesMsg -> {
Futures.addCallback(contextListenableFuture,
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray());
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
}
}
@Override
public void onFailure(Throwable t) {
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
});
} else {
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
}
} catch (RuntimeException e) {
throw new AdaptorException(e);
@ -180,13 +219,17 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
private List<TransportProtos.PostTelemetryMsg> convertToPostTelemetry(SparkplugBProto.Payload sparkplugBProto, String topicTypeName) throws AdaptorException {
try {
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>();
Set<String> attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx
.getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
long ts = protoMetric.getTimestamp();
String key = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
if (keyValueProtoOpt.isPresent()) {
msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts));
if (!attributesMetricNames.contains(protoMetric.getName())) {
long ts = protoMetric.getTimestamp();
String key = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
if (keyValueProtoOpt.isPresent()) {
msgs.add(postTelemetryMsgCreated(keyValueProtoOpt.get(), ts));
}
}
}
@ -204,6 +247,39 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
private List<TransportApiProtos.AttributesMsg> convertToPostAttributes(SparkplugBProto.Payload sparkplugBProto, String deviceName) throws AdaptorException {
try {
List<TransportApiProtos.AttributesMsg> msgs = new ArrayList<>();
Set<String> attributesMetricNames = ((MqttDeviceProfileTransportConfiguration) deviceSessionCtx
.getDeviceProfile().getProfileData().getTransportConfiguration()).getSparkPlugAttributesMetricNames();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
if (attributesMetricNames.contains(protoMetric.getName())) {
TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder();
Optional<TransportProtos.PostAttributeMsg> msgOpt = getPostAttributeMsg(protoMetric);
if (msgOpt.isPresent()) {
deviceAttributesMsgBuilder.setDeviceName(deviceName);
deviceAttributesMsgBuilder.setMsg(msgOpt.get());
msgs.add(deviceAttributesMsgBuilder.build());
}
}
}
return msgs;
} catch (IllegalStateException | JsonSyntaxException | ThingsboardException e) {
log.error("Failed to decode post telemetry request", e);
throw new AdaptorException(e);
}
}
private Optional<TransportProtos.PostAttributeMsg> getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric);
if (keyValueProtoOpt.isPresent()) {
TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder();
builder.addKv(keyValueProtoOpt.get());
return Optional.of(builder.build());
}
return Optional.empty();
}
public SparkplugTopic getSparkplugTopicNode() {
return this.sparkplugTopicNode;
}
@ -238,7 +314,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
return new SparkplugDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
}
protected void sendToDeviceRpcRequest (MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
protected void sendToDeviceRpcRequest(MqttMessage payload, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) {
parent.sendToDeviceRpcRequest(payload, rpcRequest, sessionInfo);
}
@ -247,7 +323,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
protected void sendSuccessRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ResponseCode result, String successMsg) {
parent.sendSuccessRpcResponse(sessionInfo, requestId,result, successMsg);
parent.sendSuccessRpcResponse(sessionInfo, requestId, result, successMsg);
}