sparkplug: Test Telemetry comment

This commit is contained in:
nickAS21 2023-01-23 20:42:28 +02:00
parent fa4c00c437
commit be9c213dc4
4 changed files with 107 additions and 85 deletions

View File

@ -330,6 +330,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (gatewaySessionHandler != null) { if (gatewaySessionHandler != null) {
handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);
transportService.reportActivity(deviceSessionCtx.getSessionInfo()); transportService.reportActivity(deviceSessionCtx.getSessionInfo());
} else {
log.error("[gatewaySessionHandler] is null, [{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId);
} }
} else if (sparkplugSessionHandler != null) { } else if (sparkplugSessionHandler != null) {
handleSparkplugPublishMsg(ctx, topicName, mqttMsg); handleSparkplugPublishMsg(ctx, topicName, mqttMsg);
@ -375,10 +377,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, MqttPublishMessage mqttMsgOld) { private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, MqttPublishMessage mqttMsg) {
MqttPublishMessage mqttMsg = sparkplugSessionHandler.reCreateMqttPublishMessageWithPacketId(mqttMsgOld);
int msgId = mqttMsg.variableHeader().packetId(); int msgId = mqttMsg.variableHeader().packetId();
try { try {
SparkplugTopic sparkplugTopic = parseTopicPublish(topicName); SparkplugTopic sparkplugTopic = parseTopicPublish(topicName);
String deviceName = sparkplugTopic.isNode() ? deviceSessionCtx.getDeviceInfo().getDeviceName() : sparkplugTopic.getDeviceId(); String deviceName = sparkplugTopic.isNode() ? deviceSessionCtx.getDeviceInfo().getDeviceName() : sparkplugTopic.getDeviceId();

View File

@ -21,9 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonParser; import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException; import com.google.gson.JsonSyntaxException;
import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
@ -34,7 +32,6 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -195,17 +192,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
} }
} }
public MqttPublishMessage reCreateMqttPublishMessageWithPacketId(MqttPublishMessage mqttMsgOld) {
try {
SparkplugBProto.Payload sparkplugBProto = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsgOld.payload()));
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(mqttMsgOld.variableHeader().topicName(), (int) sparkplugBProto.getSeq());
return new MqttPublishMessage(mqttMsgOld.fixedHeader(), variableHeader, mqttMsgOld.payload());
} catch (InvalidProtocolBufferException e) {
log.error("Failed to deserialize SparkplugBProto.Payload", e);
throw new RuntimeException("Failed to deserialize SparkplugBProto.Payload");
}
}
public void onDeviceConnectProto(MqttPublishMessage mqttPublishMessage, String nodeDeviceType) throws ThingsboardException { public void onDeviceConnectProto(MqttPublishMessage mqttPublishMessage, String nodeDeviceType) throws ThingsboardException {
try { try {
String topic = mqttPublishMessage.variableHeader().topicName(); String topic = mqttPublishMessage.variableHeader().topicName();

View File

@ -17,26 +17,23 @@ package org.thingsboard.server.transport.mqtt.util.sparkplug;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.ser.std.FileSerializer; import com.fasterxml.jackson.databind.ser.std.FileSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
import java.math.BigInteger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import static org.thingsboard.common.util.JacksonUtil.newArrayNode;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.BooleanArray;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes;
/** /**
* Provides utility methods for SparkplugB MQTT Payload Metric. * Provides utility methods for SparkplugB MQTT Payload Metric.
*/ */
@ -46,122 +43,153 @@ public class SparkplugMetricUtil {
public static Optional<TransportProtos.KeyValueProto> getFromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { public static Optional<TransportProtos.KeyValueProto> getFromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
// Check if the null flag has been set indicating that the value is null // Check if the null flag has been set indicating that the value is null
if (protoMetric.getIsNull()) { if (protoMetric.getIsNull()) {
return null; return Optional.empty();
} }
// Otherwise convert the value based on the type // Otherwise convert the value based on the type
int metricType = protoMetric.getDatatype(); int metricType = protoMetric.getDatatype();
switch (MetricDataType.fromInteger(metricType)) { TransportProtos.KeyValueProto.Builder builderProto = TransportProtos.KeyValueProto.newBuilder();
ArrayNode nodeArray = newArrayNode();
MetricDataType metricDataType = MetricDataType.fromInteger(metricType);
switch (metricDataType) {
case Boolean: case Boolean:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V)
.setBoolV(protoMetric.getBooleanValue()).build()); .setBoolV(protoMetric.getBooleanValue()).build());
case DateTime: case DateTime:
case Int64: case Int64:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getLongValue()).build()); .setLongV(protoMetric.getLongValue()).build());
case File:
String filename = protoMetric.getMetadata().getFileName();
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build());
case Float: case Float:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV((long) protoMetric.getFloatValue()).build()); .setLongV((long) protoMetric.getFloatValue()).build());
case Double: case Double:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV(protoMetric.getDoubleValue()).build()); .setDoubleV(protoMetric.getDoubleValue()).build());
case Int8: case Int8:
case UInt8: case UInt8:
case Int16: case Int16:
case Int32: case Int32:
case UInt16: case UInt16:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getIntValue()).build()); .setLongV(protoMetric.getIntValue()).build());
case UInt32: case UInt32:
case UInt64:
if (protoMetric.hasIntValue()) { if (protoMetric.hasIntValue()) {
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getIntValue()).build()); .setLongV(protoMetric.getIntValue()).build());
} else if (protoMetric.hasLongValue()) { } else if (protoMetric.hasLongValue()) {
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(protoMetric.getLongValue()).build()); .setLongV(protoMetric.getLongValue()).build());
} else { } else {
log.error("Invalid value for UInt32 datatype"); log.error("Invalid value for UInt32 datatype");
throw new ThingsboardException("Invalid value for UInt32 datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); throw new ThingsboardException("Invalid value for " + MetricDataType.fromInteger(metricType).name() + " datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
} }
case UInt64:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV((new BigInteger(Long.toUnsignedString(protoMetric.getLongValue()))).longValue()).build());
case String: case String:
case Text: case Text:
case UUID: case UUID:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoMetric.getStringValue()).build()); .setStringV(protoMetric.getStringValue()).build());
case Bytes: case Bytes:
case BooleanArray:
case Int8Array: case Int8Array:
case Int16Array: case Int16Array:
case Int32Array: case Int32Array:
case Int64Array:
case UInt8Array: case UInt8Array:
case UInt16Array: case UInt16Array:
case UInt32Array:
case UInt64Array:
case FloatArray: case FloatArray:
case DoubleArray: case DoubleArray:
case BooleanArray: case DateTimeArray:
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) case Int64Array:
.setStringV(Hex.encodeHexString(protoMetric.getBytesValue().toByteArray())).build()); case UInt64Array:
case DataSet: case UInt32Array:
SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
//TODO if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) {
// Build the and create the DataSet byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
/** }
return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) while (byteBuffer.hasRemaining()) {
.addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) setValueToNodeArray(nodeArray, byteBuffer, metricType);
.addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) }
.createDataSet(); return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
**/ .setJsonV(nodeArray.toString()).build());
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(protoDataSet.toString()).build());
case Template:
//TODO
// Build the and create the Template
SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue();
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV( protoTemplate.toString()).build());
case StringArray: case StringArray:
ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
List<String> stringList = new ArrayList<>();
stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN); stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
while (stringByteBuffer.hasRemaining()) { while (stringByteBuffer.hasRemaining()) {
byte b = stringByteBuffer.get(); byte b = stringByteBuffer.get();
if (b == (byte) 0) { if (b == (byte) 0) {
stringList.add(sb.toString()); nodeArray.add(sb.toString());
sb = new StringBuilder(); sb = new StringBuilder();
} else { } else {
sb.append((char) b); sb.append((char) b);
} }
} }
String st = StringUtils.join(stringList, "|"); return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) .setJsonV(nodeArray.toString()).build());
.setStringV(st).build()); case DataSet:
case DateTimeArray: case Template:
ByteBuffer dateTimeByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); case File:
List<Long> dateTimeList = new ArrayList<Long>(); //TODO
dateTimeByteBuffer.order(ByteOrder.LITTLE_ENDIAN); // Build the and create the DataSet
while (dateTimeByteBuffer.hasRemaining()) { /**
long longValue = dateTimeByteBuffer.getLong(); SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue();
dateTimeList.add(longValue); return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList())
} .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList()))
Gson gson = new GsonBuilder().create(); .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList()))
JsonArray dateTimeArray = gson.toJsonTree(dateTimeList).getAsJsonArray(); .createDataSet();
return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.JSON_V) return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(dateTimeArray.toString()).build()); .setStringV(protoDataSet.toString()).build());
**/
//TODO
// Build the and create the Template
/**
SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue();
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV( protoTemplate.toString()).build());
**/
//TODO
// Build the and create the File
/**
String filename = protoMetric.getMetadata().getFileName();
return Optional.of(builderPrbyteValueoto.setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V)
.setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build());
**/
return Optional.empty();
case Unknown: case Unknown:
default: default:
throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
} }
} }
private static void setValueToNodeArray(ArrayNode nodeArray, ByteBuffer byteBuffer, int metricType) {
switch (MetricDataType.fromInteger(metricType)) {
case Bytes:
nodeArray.add(byteBuffer.get());
break;
case BooleanArray:
nodeArray.add(byteBuffer.get() == (byte) 0 ? "false" : "true");
break;
case Int8Array:
case Int16Array:
case Int32Array:
case UInt8Array:
case UInt16Array:
nodeArray.add(byteBuffer.getInt());
break;
case FloatArray:
nodeArray.add(byteBuffer.getFloat());
break;
case DoubleArray:
nodeArray.add(byteBuffer.getDouble());
break;
case DateTimeArray:
case Int64Array:
case UInt64Array:
case UInt32Array:
nodeArray.add(byteBuffer.getLong());
}
}
@JsonIgnoreProperties( @JsonIgnoreProperties(
value = {"fileName"}) value = {"fileName"})

View File

@ -155,6 +155,14 @@ public class JacksonUtil {
return mapper.createObjectNode(); return mapper.createObjectNode();
} }
public static ArrayNode newArrayNode() {
return newArrayNode(OBJECT_MAPPER);
}
public static ArrayNode newArrayNode(ObjectMapper mapper) {
return mapper.createArrayNode();
}
public static <T> T clone(T value) { public static <T> T clone(T value) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Class<T> valueClass = (Class<T>) value.getClass(); Class<T> valueClass = (Class<T>) value.getClass();