diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e53dcd520b..92333467d9 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -81,6 +81,7 @@ import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; import org.thingsboard.server.transport.mqtt.util.ReturnCode; import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import javax.net.ssl.SSLPeerUnverifiedException; @@ -1263,7 +1264,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement try { if (sparkplugSessionHandler != null) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); - sparkplugSessionHandler.createMqttPublishMsg(deviceSessionCtx, notification).ifPresent(sparkplugSessionHandler::writeAndFlush); + notification.getSharedUpdatedList().forEach(tsKvProto -> { + if (sparkplugSessionHandler.getMetricsBirthNode().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), + SparkplugMessageType.NCMD); + sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, + sparkplugTopic.toString(), + sparkplugSessionHandler.getMetricsBirthNode().get(tsKvProto.getKv().getKey())) + .ifPresent(sparkplugSessionHandler::writeAndFlush); + } + }); } else { adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index a6f0d0d247..a89dee6dd5 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -20,6 +20,8 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; +import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -39,7 +41,16 @@ public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{ @Override public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); - ((SparkplugNodeSessionHandler)parent).createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush); + notification.getSharedUpdatedList().forEach(tsKvProto -> { + if (getMetricsBirthDevice().containsKey(tsKvProto.getKv().getKey())) { + SparkplugTopic sparkplugTopic = new SparkplugTopic(((SparkplugNodeSessionHandler)parent).getSparkplugTopicNode(), + SparkplugMessageType.DCMD, deviceInfo.getDeviceName()); + ((SparkplugNodeSessionHandler)parent).createSparkplugMqttPublishMsg(tsKvProto, + sparkplugTopic.toString(), + getMetricsBirthDevice().get(tsKvProto.getKv().getKey())) + .ifPresent(parent::writeAndFlush); + } + }); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index d72a69f111..50292e8d57 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -86,10 +86,10 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } } - public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { + public void onTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException { checkDeviceName(deviceName); ListenableFuture contextListenableFuture = topic.isNode() ? - Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName); + Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName); List msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { try { @@ -139,7 +139,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { private ListenableFuture onDeviceConnectProto(String deviceName) throws ThingsboardException { try { - String deviceType = this.gateway.getDeviceType() + "-node"; + String deviceType = this.gateway.getDeviceType() + "-node"; return onDeviceConnect(deviceName, deviceType); } catch (RuntimeException e) { log.error("Failed Sparkplug Device connect proto!", e); @@ -152,9 +152,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { List msgs = new ArrayList<>(); for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { long ts = protoMetric.getTimestamp(); - String keys = "bdSeq".equals(protoMetric.getName()) ? + String key = "bdSeq".equals(protoMetric.getName()) ? topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); - Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(keys, protoMetric); + Optional keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric); if (keyValueProtoOpt.isPresent()) { List result = new ArrayList<>(); result.add(keyValueProtoOpt.get()); @@ -166,6 +166,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { msgs.add(request.build()); } } + if (DBIRTH.name().equals(topicTypeName)) { List result = new ArrayList<>(); TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder(); @@ -192,29 +193,22 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { } - public Optional createMqttPublishMsg (MqttDeviceAwareSessionContext ctx, - TransportProtos.AttributeUpdateNotificationMsg notification, - String... deviceName) { + public Optional createSparkplugMqttPublishMsg(TransportProtos.TsKvProto tsKvProto, + String sparkplugTopic, + SparkplugBProto.Payload.Metric metricBirth) { try { - long ts = notification.getSharedUpdated(0).getTs(); - String key = notification.getSharedUpdated(0).getKv().getKey(); - if (metricsBirthNode.containsKey(key)) { - SparkplugBProto.Payload.Metric metricBirth = metricsBirthNode.get(key); - MetricDataType metricDataType = MetricDataType.fromInteger(metricBirth.getDatatype()); - Optional value = validatedValueByTypeMetric(notification.getSharedUpdated(0).getKv(), metricDataType); - if (value.isPresent()) { - SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder() - .setTimestamp(ts); - cmdPayload.addMetrics(createMetric(value, ts, key, metricDataType)); - byte[] payloadInBytes = cmdPayload.build().toByteArray(); - String topic = deviceName == null ? sparkplugTopicNode.toString() : sparkplugTopicNode.toString() - + "/" + deviceName; - return Optional.of(getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes)); - } - } else { - return Optional.empty(); + long ts = tsKvProto.getTs(); + MetricDataType metricDataType = MetricDataType.fromInteger(metricBirth.getDatatype()); + Optional value = validatedValueByTypeMetric(tsKvProto.getKv(), metricDataType); + if (value.isPresent()) { + SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder() + .setTimestamp(ts); + cmdPayload.addMetrics(createMetric(value.get(), ts, tsKvProto.getKv().getKey(), metricDataType)); + byte[] payloadInBytes = cmdPayload.build().toByteArray(); + return Optional.of(getPayloadAdaptor().createMqttPublishMsg(deviceSessionCtx, sparkplugTopic, payloadInBytes)); } - } catch (Exception e) { + } catch ( + Exception e) { log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e); return Optional.empty(); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index c89c21b01a..c2a9059bf5 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.ser.std.FileSerializer; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.gen.transport.TransportProtos; @@ -39,6 +40,7 @@ import java.nio.FloatBuffer; import java.nio.IntBuffer; import java.nio.LongBuffer; import java.nio.ShortBuffer; +import java.text.NumberFormat; import java.util.Arrays; import java.util.Optional; @@ -74,8 +76,8 @@ public class SparkplugMetricUtil { return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) .setDoubleV(f.doubleValue()).build()); case Double: - return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V) - .setDoubleV(protoMetric.getDoubleValue()).build()); + return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V) + .setLongV(Double.valueOf(protoMetric.getDoubleValue()).longValue()).build()); case Int8: case UInt8: case Int16: @@ -103,7 +105,7 @@ public class SparkplugMetricUtil { // byte[] case BooleanArray: ByteBuffer booleanByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - while (booleanByteBuffer.hasRemaining()){ + while (booleanByteBuffer.hasRemaining()) { nodeArray.add(booleanByteBuffer.get() == (byte) 0 ? false : true); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -112,7 +114,7 @@ public class SparkplugMetricUtil { case Bytes: case Int8Array: ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()); - while (byteBuffer.hasRemaining()){ + while (byteBuffer.hasRemaining()) { nodeArray.add(byteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -122,7 +124,7 @@ public class SparkplugMetricUtil { case UInt8Array: ShortBuffer shortByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) .asShortBuffer(); - while (shortByteBuffer.hasRemaining()){ + while (shortByteBuffer.hasRemaining()) { nodeArray.add(shortByteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -132,7 +134,7 @@ public class SparkplugMetricUtil { case UInt16Array: IntBuffer intByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) .asIntBuffer(); - while (intByteBuffer.hasRemaining()){ + while (intByteBuffer.hasRemaining()) { nodeArray.add(intByteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -141,7 +143,7 @@ public class SparkplugMetricUtil { case FloatArray: FloatBuffer floatByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) .asFloatBuffer(); - while (floatByteBuffer.hasRemaining()){ + while (floatByteBuffer.hasRemaining()) { nodeArray.add(floatByteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -150,7 +152,7 @@ public class SparkplugMetricUtil { case DoubleArray: DoubleBuffer doubleByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) .asDoubleBuffer(); - while (doubleByteBuffer.hasRemaining()){ + while (doubleByteBuffer.hasRemaining()) { nodeArray.add(doubleByteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -162,7 +164,7 @@ public class SparkplugMetricUtil { case UInt32Array: LongBuffer longByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray()) .asLongBuffer(); - while (longByteBuffer.hasRemaining()){ + while (longByteBuffer.hasRemaining()) { nodeArray.add(longByteBuffer.get()); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -175,7 +177,7 @@ public class SparkplugMetricUtil { new ObjectInputStream(byteArrayInputStream); final String[] stringArray = (String[]) objectInputStream.readObject(); objectInputStream.close(); - for (String s: stringArray) { + for (String s : stringArray) { nodeArray.add(s); } return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V) @@ -213,13 +215,13 @@ public class SparkplugMetricUtil { default: throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS); } - } catch (Exception e){ + } catch (Exception e) { log.error("", e); - return Optional.empty(); + return Optional.empty(); } } - public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException { + public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException { SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder() .setTimestamp(ts) .setName(key) @@ -237,7 +239,7 @@ public class SparkplugMetricUtil { if (value instanceof Long) { return metric.toBuilder().setLongValue((long) value).build(); } else { - return metric.toBuilder().setIntValue((int)value).build(); + return metric.toBuilder().setIntValue((int) value).build(); } case Int64: case UInt64: @@ -307,129 +309,224 @@ public class SparkplugMetricUtil { return metric; } - public static Optional validatedValueByTypeMetric (TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { - try { - switch (metricDataType) { - case Int8: - case Int16: - case UInt8: - case UInt16: - case Int32: - case UInt32: - case Int64: - case UInt64: - case DateTime: - if (kv.getTypeValue()==1) { - return Optional.of(Integer.valueOf(String.valueOf(kv.getLongV()))); - } - break; - case Float: - if (kv.getTypeValue()==2) { - var f = new BigDecimal(String.valueOf(kv.getDoubleV())); - return Optional.of(f.floatValue()); - } - break; - case Double: - if (kv.getTypeValue()==2) { - return Optional.of(kv.getLongV()); - } - break; - case Boolean: - if (kv.getTypeValue()==3) { - return Optional.of(kv.getBoolV()); - } - break; - case String: - case Text: - case UUID: - if (kv.getTypeValue()==4) { - return Optional.of(kv.getStringV()); - } - break; - case Bytes: - case Int8Array: - if (kv.getTypeValue()==5) { -// ByteString byteString = ByteString.copyFrom((byte[]) value); -// return metric.toBuilder().setBytesValue(byteString).build(); - return Optional.of(kv.getJsonV()); - } - break; - case Int16Array: - case UInt8Array: - if (kv.getTypeValue()==5) { -// byte[] int16Array = shortArrayToByteArray((short[]) value); -// ByteString byteInt16Array = ByteString.copyFrom((int16Array)); - return Optional.of(kv.getJsonV()); - } - break; - case Int32Array: - case UInt16Array: - case Int64Array: - case UInt32Array: - case UInt64Array: - case DateTimeArray: - if (kv.getTypeValue()==5) { -// if (value instanceof int[]) { -// byte[] int32Array = integerArrayToByteArray((int[]) value); -// ByteString byteInt32Array = ByteString.copyFrom((int32Array)); -// return metric.toBuilder().setBytesValue(byteInt32Array).build(); -// } else { -// byte[] int64Array = longArrayToByteArray((long[]) value); -// ByteString byteInt64Array = ByteString.copyFrom((int64Array)); -// return metric.toBuilder().setBytesValue(byteInt64Array).build(); -// } - return Optional.of(kv.getJsonV()); - } - break; - case DoubleArray: - if (kv.getTypeValue()==5) { -// byte[] doubleArray = doublArrayToByteArray((double[]) value); -// ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); -// return metric.toBuilder().setBytesValue(byteDoubleArray).build(); - return Optional.of(kv.getJsonV()); - } - break; - case FloatArray: - if (kv.getTypeValue()==5) { -// byte[] floatArray = floatArrayToByteArray((float[]) value); -// ByteString byteFloatArray = ByteString.copyFrom(floatArray); -// return metric.toBuilder().setBytesValue(byteFloatArray).build(); - return Optional.of(kv.getJsonV()); - } - break; - case BooleanArray: - if (kv.getTypeValue()==5) { -// byte[] booleanArray = booleanArrayToByteArray((boolean[]) value); -// ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); -// return metric.toBuilder().setBytesValue(byteBooleanArray).build(); - return Optional.of(kv.getJsonV()); - } - break; - case StringArray: - if (kv.getTypeValue()==5) { -// byte[] stringArray = stringArrayToByteArray((String[]) value); -// ByteString byteStringArray = ByteString.copyFrom(stringArray); -// return metric.toBuilder().setBytesValue(byteStringArray).build(); - return Optional.of(kv.getJsonV()); - } - break; - case DataSet: - case File: - case Template: - log.error("Invalid type value [{}] for MetricDataType [{}]", kv, metricDataType.name()); - return Optional.empty(); - case Unknown: - log.error("Invalid MetricDataType [{}] type, value [{}]", kv, metricDataType.name()); - return Optional.empty(); - } - } catch (Exception e) { - log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); + public static Optional validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { + if (kv.getTypeValue() <= 3) { + return validatedValuePrimitiveByTypeMetric(kv, metricDataType); + } else if (kv.getTypeValue() == 4) { + return validatedValueJsonByTypeMetric(kv, metricDataType); + } else { return Optional.empty(); } + } + + public static Optional validatedValuePrimitiveByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { + Optional valueOpt = getValueKvProtoPrimitive(kv); + if (valueOpt.isPresent()) { + try { + switch (metricDataType) { + // int + case Int8: + case Int16: + case UInt8: + case UInt16: + return Optional.of(Integer.valueOf(valueOpt.get())); + // int/long + case Int32: + case UInt32: + try { + return Optional.of(Integer.valueOf(valueOpt.get())); + } catch (NumberFormatException e) { + return Optional.of(Long.valueOf(valueOpt.get())); + } + // long + case Int64: + case UInt64: + case DateTime: + return Optional.of(Long.valueOf(valueOpt.get())); + // float + case Float: + var f = new BigDecimal(valueOpt.get()); + return Optional.of(f.floatValue()); + // double + case Double: + var dd = new BigDecimal(valueOpt.get()); + return Optional.of(dd.doubleValue()); + case Boolean: + if ("true".equals(valueOpt.get())) { + return Optional.of(true); + } else if ("false".equals(valueOpt.get())) { + return Optional.of(false); + } else { + Number number = NumberFormat.getInstance().parse(valueOpt.get()); + if (StringUtils.isBlank(number.toString()) || "0".equals(number.toString())) { // ok 0 + return Optional.of(false); + } else { + return Optional.of(true); + } + } + case String: + case Text: + case UUID: + if (kv.getTypeValue() == 4) { + return Optional.of(valueOpt.get()); + } + break; + } + } catch (Exception e) { + log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); + return Optional.empty(); + } + } return Optional.empty(); } - private static byte[] shortArrayToByteArray(short[] inputs) { + + public static Optional validatedValueJsonByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { +// try { +// Optional valueOpt; +// switch (metricDataType) { +// // int +// case Int8: +// case Int16: +// case UInt8: +// case UInt16: +// valueOpt = getValueKvProtoPrimitive(kv); +// return valueOpt.isPresent() ? Optional.of(Integer.valueOf(String.valueOf(valueOpt.get()))) : valueOpt; +// // int/long +// case Int32: +// case UInt32: +// valueOpt = getValueKvProtoPrimitive(kv); +// try { +// return Optional.of(Integer.valueOf(String.valueOf(valueOpt.get()))); +// } catch (NumberFormatException e) { +// return Optional.of(Long.valueOf(String.valueOf(valueOpt.get()))); +// } +// // long +// case Int64: +// case UInt64: +// case DateTime: +// valueOpt = getValueKvProtoPrimitive(kv); +// return Optional.of(Long.valueOf(String.valueOf(valueOpt.get()))); +// // float +// case Float: +// valueOpt = getValueKvProtoPrimitive(kv); +// var f = new BigDecimal(String.valueOf(kv.getDoubleV())); +// return Optional.of(f.floatValue()); +// } +// break; +// // double +// case Double: +// if (kv.getTypeValue() == 1) { +// return Optional.of(kv.getLongV()); +// } +// break; +// case Boolean: +// if (kv.getTypeValue() == 0) { // ok 0 +// return Optional.of(kv.getBoolV()); +// } +// break; +// case String: +// case Text: +// case UUID: +// if (kv.getTypeValue() == 4) { +// return Optional.of(kv.getStringV()); +// } +// break; +// // byte[] +// case Bytes: +// case Int8Array: +// if (kv.getTypeValue() == 5) { +//// ByteString byteString = ByteString.copyFrom((byte[]) value); +//// return metric.toBuilder().setBytesValue(byteString).build(); +// return Optional.of(kv.getJsonV()); +// } +// break; +// // short[] +// case Int16Array: +// case UInt8Array: +// if (kv.getTypeValue() == 5) { +//// byte[] int16Array = shortArrayToByteArray((short[]) value); +//// ByteString byteInt16Array = ByteString.copyFrom((int16Array)); +// return Optional.of(kv.getJsonV()); +// } +// break; +// // int [] +// case UInt16Array: +// case Int32Array: +// +// // int[] / long[] +// case UInt32Array: +// +// // long[] +// case Int64Array: +// case UInt64Array: +// case DateTimeArray: +// if (kv.getTypeValue() == 5) { +//// if (value instanceof int[]) { +//// byte[] int32Array = integerArrayToByteArray((int[]) value); +//// ByteString byteInt32Array = ByteString.copyFrom((int32Array)); +//// return metric.toBuilder().setBytesValue(byteInt32Array).build(); +//// } else { +//// byte[] int64Array = longArrayToByteArray((long[]) value); +//// ByteString byteInt64Array = ByteString.copyFrom((int64Array)); +//// return metric.toBuilder().setBytesValue(byteInt64Array).build(); +//// } +// return Optional.of(kv.getJsonV()); +// } +// break; +// // double [] +// case DoubleArray: +// if (kv.getTypeValue() == 5) { +//// byte[] doubleArray = doublArrayToByteArray((double[]) value); +//// ByteString byteDoubleArray = ByteString.copyFrom(doubleArray); +//// return metric.toBuilder().setBytesValue(byteDoubleArray).build(); +// return Optional.of(kv.getJsonV()); +// } +// break; +// // float[] +// case FloatArray: +// if (kv.getTypeValue() == 5) { +//// byte[] floatArray = floatArrayToByteArray((float[]) value); +//// ByteString byteFloatArray = ByteString.copyFrom(floatArray); +//// return metric.toBuilder().setBytesValue(byteFloatArray).build(); +// return Optional.of(kv.getJsonV()); +// } +// break; +// // boolean[] +// case BooleanArray: +// if (kv.getTypeValue() == 5) { +//// byte[] booleanArray = booleanArrayToByteArray((boolean[]) value); +//// ByteString byteBooleanArray = ByteString.copyFrom(booleanArray); +//// return metric.toBuilder().setBytesValue(byteBooleanArray).build(); +// return Optional.of(kv.getJsonV()); +// } +// break; +// // String [] +// case StringArray: +// if (kv.getTypeValue() == 5) { +//// byte[] stringArray = stringArrayToByteArray((String[]) value); +//// ByteString byteStringArray = ByteString.copyFrom(stringArray); +//// return metric.toBuilder().setBytesValue(byteStringArray).build(); +// return Optional.of(kv.getJsonV()); +// } +// break; +// case DataSet: +// case File: +// case Template: +// log.error("Invalid type value [{}] for MetricDataType [{}]", kv, metricDataType.name()); +// return Optional.empty(); +// case Unknown: +// log.error("Invalid MetricDataType [{}] type, value [{}]", kv, metricDataType.name()); +// return Optional.empty(); +// } catch (Exception e) { +// log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); +// return Optional.empty(); +// } + return Optional.empty(); + } + + + private static byte[] shortArrayToByteArray(short[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2); for (short d : inputs) { bb.putShort(d); @@ -437,7 +534,7 @@ public class SparkplugMetricUtil { return bb.array(); } - private static byte[] integerArrayToByteArray(int[] inputs) { + private static byte[] integerArrayToByteArray(int[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4); for (int d : inputs) { bb.putInt(d); @@ -445,7 +542,7 @@ public class SparkplugMetricUtil { return bb.array(); } - private static byte[] longArrayToByteArray(long[] inputs) { + private static byte[] longArrayToByteArray(long[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); for (long d : inputs) { bb.putLong(d); @@ -453,7 +550,7 @@ public class SparkplugMetricUtil { return bb.array(); } - private static byte[] doublArrayToByteArray(double[] inputs) { + private static byte[] doublArrayToByteArray(double[] inputs) { ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8); for (double d : inputs) { bb.putDouble(d); @@ -461,7 +558,7 @@ public class SparkplugMetricUtil { return bb.array(); } - private static byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException { + private static byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException { ByteArrayOutputStream bas = new ByteArrayOutputStream(); DataOutputStream ds = new DataOutputStream(bas); for (float f : inputs) { @@ -474,15 +571,15 @@ public class SparkplugMetricUtil { return bas.toByteArray(); } - private static byte[] booleanArrayToByteArray(boolean[] inputs) { + private static byte[] booleanArrayToByteArray(boolean[] inputs) { byte[] toReturn = new byte[inputs.length]; for (int entry = 0; entry < toReturn.length; entry++) { - toReturn[entry] = (byte) (inputs[entry]?1:0); + toReturn[entry] = (byte) (inputs[entry] ? 1 : 0); } return toReturn; } - private static byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException { + private static byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException { final ByteArrayOutputStream bas = new ByteArrayOutputStream(); try { final ObjectOutputStream os = new ObjectOutputStream(bas); @@ -495,6 +592,19 @@ public class SparkplugMetricUtil { return bas.toByteArray(); } + private static Optional getValueKvProtoPrimitive(TransportProtos.KeyValueProto kv) { + if (kv.getTypeValue() == 0) { // boolean + return Optional.of(String.valueOf(kv.getBoolV())); + } else if (kv.getTypeValue() == 1) { // kvLong + return Optional.of(String.valueOf(kv.getLongV())); + } else if (kv.getTypeValue() == 2) { // kvString + return Optional.of(kv.getStringV()); + } else if (kv.getTypeValue() == 3) { // kvDouble + return Optional.of(String.valueOf(kv.getDoubleV())); + } else { + return Optional.empty(); + } + } @JsonIgnoreProperties( value = {"fileName"}) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java index 373e253ed5..ace007abaf 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugTopic.java @@ -87,6 +87,23 @@ public class SparkplugTopic { this.type = type; } + public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type) { + super(); + this.namespace = sparkplugTopic.namespace; + this.groupId = sparkplugTopic.groupId; + this.edgeNodeId = sparkplugTopic.edgeNodeId; + this.deviceId = null; + this.type = type; + } + public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type, String deviceId) { + super(); + this.namespace = sparkplugTopic.namespace; + this.groupId = sparkplugTopic.groupId; + this.edgeNodeId = sparkplugTopic.edgeNodeId; + this.deviceId = deviceId; + this.type = type; + } + /** * Returns the Sparkplug namespace version. *