diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 24012e1e7a..2a4dcbae16 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -330,6 +330,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (gatewaySessionHandler != null) { handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); transportService.reportActivity(deviceSessionCtx.getSessionInfo()); + } else { + log.error("[gatewaySessionHandler] is null, [{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId); } } else if (sparkplugSessionHandler != null) { handleSparkplugPublishMsg(ctx, topicName, mqttMsg); @@ -375,10 +377,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, MqttPublishMessage mqttMsgOld) { - MqttPublishMessage mqttMsg = sparkplugSessionHandler.reCreateMqttPublishMessageWithPacketId(mqttMsgOld); + private void handleSparkplugPublishMsg(ChannelHandlerContext ctx, String topicName, MqttPublishMessage mqttMsg) { int msgId = mqttMsg.variableHeader().packetId(); - try { SparkplugTopic sparkplugTopic = parseTopicPublish(topicName); String deviceName = sparkplugTopic.isNode() ? deviceSessionCtx.getDeviceInfo().getDeviceName() : sparkplugTopic.getDeviceId(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 547139ebfb..9c0351b189 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -21,9 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import com.google.protobuf.Descriptors; -import com.google.protobuf.InvalidProtocolBufferException; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.StringUtils; @@ -34,7 +32,6 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; -import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.annotation.Nullable; @@ -195,17 +192,6 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public MqttPublishMessage reCreateMqttPublishMessageWithPacketId(MqttPublishMessage mqttMsgOld) { - try { - SparkplugBProto.Payload sparkplugBProto = SparkplugBProto.Payload.parseFrom(ProtoMqttAdaptor.toBytes(mqttMsgOld.payload())); - MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(mqttMsgOld.variableHeader().topicName(), (int) sparkplugBProto.getSeq()); - return new MqttPublishMessage(mqttMsgOld.fixedHeader(), variableHeader, mqttMsgOld.payload()); - } catch (InvalidProtocolBufferException e) { - log.error("Failed to deserialize SparkplugBProto.Payload", e); - throw new RuntimeException("Failed to deserialize SparkplugBProto.Payload"); - } - } - public void onDeviceConnectProto(MqttPublishMessage mqttPublishMessage, String nodeDeviceType) throws ThingsboardException { try { String topic = mqttPublishMessage.variableHeader().topicName(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index 6b575ba306..53d173b7fb 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -17,26 +17,23 @@ package org.thingsboard.server.transport.mqtt.util.sparkplug; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.ser.std.FileSerializer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.lang3.StringUtils; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; -import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Optional; +import static org.thingsboard.common.util.JacksonUtil.newArrayNode; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.BooleanArray; +import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.Bytes; + /** * Provides utility methods for SparkplugB MQTT Payload Metric. */ @@ -46,122 +43,153 @@ public class SparkplugMetricUtil { public static Optional getFromSparkplugBMetricToKeyValueProto(String key, SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { // Check if the null flag has been set indicating that the value is null if (protoMetric.getIsNull()) { - return null; + return Optional.empty(); } // Otherwise convert the value based on the type int metricType = protoMetric.getDatatype(); - switch (MetricDataType.fromInteger(metricType)) { + TransportProtos.KeyValueProto.Builder builderProto = TransportProtos.KeyValueProto.newBuilder(); + ArrayNode nodeArray = newArrayNode(); + MetricDataType metricDataType = MetricDataType.fromInteger(metricType); + switch (metricDataType) { case Boolean: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.BOOLEAN_V) .setBoolV(protoMetric.getBooleanValue()).build()); case DateTime: case Int64: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) .setLongV(protoMetric.getLongValue()).build()); - case File: - String filename = protoMetric.getMetadata().getFileName(); - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build()); case Float: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) .setLongV((long) protoMetric.getFloatValue()).build()); case Double: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) .setDoubleV(protoMetric.getDoubleValue()).build()); case Int8: case UInt8: case Int16: case Int32: case UInt16: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) .setLongV(protoMetric.getIntValue()).build()); case UInt32: + case UInt64: if (protoMetric.hasIntValue()) { - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) .setLongV(protoMetric.getIntValue()).build()); } else if (protoMetric.hasLongValue()) { - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) .setLongV(protoMetric.getLongValue()).build()); } else { log.error("Invalid value for UInt32 datatype"); - throw new ThingsboardException("Invalid value for UInt32 datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); + throw new ThingsboardException("Invalid value for " + MetricDataType.fromInteger(metricType).name() + " datatype " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); } - case UInt64: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) - .setDoubleV((new BigInteger(Long.toUnsignedString(protoMetric.getLongValue()))).longValue()).build()); case String: case Text: case UUID: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) .setStringV(protoMetric.getStringValue()).build()); case Bytes: + case BooleanArray: case Int8Array: case Int16Array: case Int32Array: - case Int64Array: case UInt8Array: case UInt16Array: - case UInt32Array: - case UInt64Array: case FloatArray: case DoubleArray: - case BooleanArray: - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(Hex.encodeHexString(protoMetric.getBytesValue().toByteArray())).build()); - case DataSet: - SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); - //TODO - // Build the and create the DataSet - /** - return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) - .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) - .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) - .createDataSet(); - **/ - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(protoDataSet.toString()).build()); - case Template: - //TODO - // Build the and create the Template - SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue(); - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV( protoTemplate.toString()).build()); + case DateTimeArray: + case Int64Array: + case UInt64Array: + case UInt32Array: + ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); + if (!(metricDataType.equals(Bytes) || metricDataType.equals(BooleanArray))) { + byteBuffer.order(ByteOrder.LITTLE_ENDIAN); + } + while (byteBuffer.hasRemaining()) { + setValueToNodeArray(nodeArray, byteBuffer, metricType); + } + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); case StringArray: ByteBuffer stringByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - List stringList = new ArrayList<>(); stringByteBuffer.order(ByteOrder.LITTLE_ENDIAN); StringBuilder sb = new StringBuilder(); while (stringByteBuffer.hasRemaining()) { byte b = stringByteBuffer.get(); if (b == (byte) 0) { - stringList.add(sb.toString()); + nodeArray.add(sb.toString()); sb = new StringBuilder(); } else { - sb.append((char) b); } } - String st = StringUtils.join(stringList, "|"); - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.STRING_V) - .setStringV(st).build()); - case DateTimeArray: - ByteBuffer dateTimeByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - List dateTimeList = new ArrayList(); - dateTimeByteBuffer.order(ByteOrder.LITTLE_ENDIAN); - while (dateTimeByteBuffer.hasRemaining()) { - long longValue = dateTimeByteBuffer.getLong(); - dateTimeList.add(longValue); - } - Gson gson = new GsonBuilder().create(); - JsonArray dateTimeArray = gson.toJsonTree(dateTimeList).getAsJsonArray(); - return Optional.of(TransportProtos.KeyValueProto.newBuilder().setKey(key).setType(TransportProtos.KeyValueType.JSON_V) - .setStringV(dateTimeArray.toString()).build()); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) + .setJsonV(nodeArray.toString()).build()); + case DataSet: + case Template: + case File: + //TODO + // Build the and create the DataSet + /** + SparkplugBProto.Payload.DataSet protoDataSet = protoMetric.getDatasetValue(); + return new SparkplugBProto.Payload.DataSet.Builder(protoDataSet.getNumOfColumns()).addColumnNames(protoDataSet.getColumnsList()) + .addTypes(convertDataSetDataTypes(protoDataSet.getTypesList())) + .addRows(convertDataSetRows(protoDataSet.getRowsList(), protoDataSet.getTypesList())) + .createDataSet(); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV(protoDataSet.toString()).build()); + **/ + //TODO + // Build the and create the Template + /** + SparkplugBProto.Payload.Template protoTemplate = protoMetric.getTemplateValue(); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV( protoTemplate.toString()).build()); + **/ + //TODO + // Build the and create the File + /** + String filename = protoMetric.getMetadata().getFileName(); + return Optional.of(builderPrbyteValueoto.setKey(key + "_" + filename).setType(TransportProtos.KeyValueType.STRING_V) + .setStringV(Hex.encodeHexString((protoMetric.getBytesValue().toByteArray()))).build()); + **/ + return Optional.empty(); case Unknown: default: throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); } + + } + private static void setValueToNodeArray(ArrayNode nodeArray, ByteBuffer byteBuffer, int metricType) { + switch (MetricDataType.fromInteger(metricType)) { + case Bytes: + nodeArray.add(byteBuffer.get()); + break; + case BooleanArray: + nodeArray.add(byteBuffer.get() == (byte) 0 ? "false" : "true"); + break; + case Int8Array: + case Int16Array: + case Int32Array: + case UInt8Array: + case UInt16Array: + nodeArray.add(byteBuffer.getInt()); + break; + case FloatArray: + nodeArray.add(byteBuffer.getFloat()); + break; + case DoubleArray: + nodeArray.add(byteBuffer.getDouble()); + break; + case DateTimeArray: + case Int64Array: + case UInt64Array: + case UInt32Array: + nodeArray.add(byteBuffer.getLong()); + } + } @JsonIgnoreProperties( value = {"fileName"}) diff --git a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java index ec464caf7b..55feab05c2 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java @@ -155,6 +155,14 @@ public class JacksonUtil { return mapper.createObjectNode(); } + public static ArrayNode newArrayNode() { + return newArrayNode(OBJECT_MAPPER); + } + + public static ArrayNode newArrayNode(ObjectMapper mapper) { + return mapper.createArrayNode(); + } + public static T clone(T value) { @SuppressWarnings("unchecked") Class valueClass = (Class) value.getClass();