From bcdc618e4817036b29c6f640ec55f27ec499f19d Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Fri, 3 Feb 2023 16:10:54 +0200 Subject: [PATCH] sparkplug: add converter kvProto to value Metric --- .../transport/mqtt/MqttTransportHandler.java | 6 +- .../MqttDeviceAwareSessionContext.java | 15 ++-- .../SparkplugDeviceSessionContext.java | 4 +- .../session/SparkplugNodeSessionHandler.java | 23 +++--- .../util/sparkplug/SparkplugMetricUtil.java | 80 ++++++++++++------- 5 files changed, 78 insertions(+), 50 deletions(-) diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 92333467d9..31037b075d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -393,7 +393,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement // TODO break; case NBIRTH: - sparkplugSessionHandler.setMetricsBirthNode (sparkplugBProtoNode.getMetricsList()); + sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList()); sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic); break; case NCMD: @@ -1265,12 +1265,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (sparkplugSessionHandler != null) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); notification.getSharedUpdatedList().forEach(tsKvProto -> { - if (sparkplugSessionHandler.getMetricsBirthNode().containsKey(tsKvProto.getKv().getKey())) { + if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), SparkplugMessageType.NCMD); sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, sparkplugTopic.toString(), - sparkplugSessionHandler.getMetricsBirthNode().get(tsKvProto.getKv().getKey())) + sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) .ifPresent(sparkplugSessionHandler::writeAndFlush); } }); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java index 5b4a9a4c95..12b4ba8024 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -32,24 +32,27 @@ import java.util.stream.Collectors; public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { private final ConcurrentMap mqttQoSMap; - private final Map metricsBirthDevice; + private Map deviceBirthMetrics; public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap mqttQoSMap) { super(sessionId); this.mqttQoSMap = mqttQoSMap; - this.metricsBirthDevice = new ConcurrentHashMap<>(); + this.deviceBirthMetrics = null; } public ConcurrentMap getMqttQoSMap() { return mqttQoSMap; } - public Map getMetricsBirthDevice() { - return metricsBirthDevice; + public Map getDeviceBirthMetrics() { + return deviceBirthMetrics; } - public void setMetricsBirthDevice(java.util.List metrics) { - this.metricsBirthDevice.putAll(metrics.stream() + public void setDeviceBirthMetrics(java.util.List metrics) { + if (this.deviceBirthMetrics == null) { + this.deviceBirthMetrics = new ConcurrentHashMap<>(); + } + this.deviceBirthMetrics.putAll(metrics.stream() .collect(Collectors.toMap(metric -> metric.getName(), metric -> metric))); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java index a89dee6dd5..4416501dea 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugDeviceSessionContext.java @@ -42,12 +42,12 @@ public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{ public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) { log.trace("[{}] Received attributes update notification to sparkplug device", sessionId); notification.getSharedUpdatedList().forEach(tsKvProto -> { - if (getMetricsBirthDevice().containsKey(tsKvProto.getKv().getKey())) { + if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) { SparkplugTopic sparkplugTopic = new SparkplugTopic(((SparkplugNodeSessionHandler)parent).getSparkplugTopicNode(), SparkplugMessageType.DCMD, deviceInfo.getDeviceName()); ((SparkplugNodeSessionHandler)parent).createSparkplugMqttPublishMsg(tsKvProto, sparkplugTopic.toString(), - getMetricsBirthDevice().get(tsKvProto.getKv().getKey())) + getDeviceBirthMetrics().get(tsKvProto.getKv().getKey())) .ifPresent(parent::writeAndFlush); } }); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 50292e8d57..4a379e8a85 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -56,22 +56,22 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetr public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { private final SparkplugTopic sparkplugTopicNode; - private final Map metricsBirthNode; + private final Map nodeBirthMetrics; public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, SparkplugTopic sparkplugTopicNode) { super(deviceSessionCtx, sessionId); this.sparkplugTopicNode = sparkplugTopicNode; - this.metricsBirthNode = new ConcurrentHashMap<>(); + this.nodeBirthMetrics = new ConcurrentHashMap<>(); } - public void setMetricsBirthNode(java.util.List metrics) { - this.metricsBirthNode.putAll(metrics.stream() + public void setNodeBirthMetrics(java.util.List metrics) { + this.nodeBirthMetrics.putAll(metrics.stream() .collect(Collectors.toMap(metric -> metric.getName(), metric -> metric))); } - public Map getMetricsBirthNode() { - return this.metricsBirthNode; + public Map getNodeBirthMetrics() { + return this.nodeBirthMetrics; } public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { @@ -93,7 +93,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { List msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name()); if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) { try { - contextListenableFuture.get().setMetricsBirthDevice(sparkplugBProto.getMetricsList()); + contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList()); } catch (InterruptedException | ExecutionException e) { log.error("Failed add Metrics. MessageType *BIRTH.", e); } @@ -206,10 +206,13 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler { cmdPayload.addMetrics(createMetric(value.get(), ts, tsKvProto.getKv().getKey(), metricDataType)); byte[] payloadInBytes = cmdPayload.build().toByteArray(); return Optional.of(getPayloadAdaptor().createMqttPublishMsg(deviceSessionCtx, sparkplugTopic, payloadInBytes)); + } else { + log.trace("DeviceId: [{}] tenantId: [{}] sessionId:[{}] Failed to convert device attributes [{}] response to MQTT sparkplug msg", + deviceSessionCtx.getDeviceInfo().getDeviceId(), deviceSessionCtx.getDeviceInfo().getTenantId(), sessionId, tsKvProto.getKv()); } - } catch ( - Exception e) { - log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e); + } catch (Exception e) { + log.trace("DeviceId: [{}] tenantId: [{}] sessionId:[{}] Failed to convert device attributes response to MQTT sparkplug msg", + deviceSessionCtx.getDeviceInfo().getDeviceId(), deviceSessionCtx.getDeviceInfo().getTenantId(), sessionId, e); return Optional.empty(); } return Optional.empty(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java index c2a9059bf5..0b36e1fe90 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/SparkplugMetricUtil.java @@ -232,25 +232,19 @@ public class SparkplugMetricUtil { case Int16: case UInt8: case UInt16: - int valueMetric = Integer.valueOf(String.valueOf(value)); - return metric.toBuilder().setIntValue(valueMetric).build(); case Int32: + return metric.toBuilder().setIntValue(((Integer) value).intValue()).build(); case UInt32: - if (value instanceof Long) { - return metric.toBuilder().setLongValue((long) value).build(); - } else { - return metric.toBuilder().setIntValue((int) value).build(); - } case Int64: case UInt64: case DateTime: - return metric.toBuilder().setLongValue((long) value).build(); + return metric.toBuilder().setLongValue(((Long) value).longValue()).build(); case Float: - return metric.toBuilder().setFloatValue((float) value).build(); + return metric.toBuilder().setFloatValue(((Float) value).floatValue()).build(); case Double: - return metric.toBuilder().setDoubleValue((double) value).build(); + return metric.toBuilder().setDoubleValue(((Double) value).doubleValue()).build(); case Boolean: - return metric.toBuilder().setBooleanValue((boolean) value).build(); + return metric.toBuilder().setBooleanValue(((Boolean) value).booleanValue()).build(); case String: case Text: case UUID: @@ -309,17 +303,17 @@ public class SparkplugMetricUtil { return metric; } - public static Optional validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { + public static Optional validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) throws ThingsboardException { if (kv.getTypeValue() <= 3) { return validatedValuePrimitiveByTypeMetric(kv, metricDataType); } else if (kv.getTypeValue() == 4) { return validatedValueJsonByTypeMetric(kv, metricDataType); } else { - return Optional.empty(); + throw new ThingsboardException("Invalid type KeyValueProto " + kv.toString() + " for MetricDataType " + metricDataType.name(), ThingsboardErrorCode.INVALID_ARGUMENTS); } } - public static Optional validatedValuePrimitiveByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) { + public static Optional validatedValuePrimitiveByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) throws ThingsboardException { Optional valueOpt = getValueKvProtoPrimitive(kv); if (valueOpt.isPresent()) { try { @@ -329,26 +323,47 @@ public class SparkplugMetricUtil { case Int16: case UInt8: case UInt16: - return Optional.of(Integer.valueOf(valueOpt.get())); - // int/long case Int32: - case UInt32: + Optional boolInt8 = booleanStringToInt (valueOpt.get()); + if(boolInt8.isPresent()) { + return Optional.of(boolInt8.get()); + } try { return Optional.of(Integer.valueOf(valueOpt.get())); - } catch (NumberFormatException e) { - return Optional.of(Long.valueOf(valueOpt.get())); + } catch (NumberFormatException eInt) { + var i = new BigDecimal(valueOpt.get()); + if (i.longValue() <= Integer.MAX_VALUE) { + return Optional.of(i.intValue()); + } + throw new ThingsboardException("Invalid type value " + kv.toString() + " for MetricDataType " + + metricDataType.name(), eInt, ThingsboardErrorCode.INVALID_ARGUMENTS); } // long + case UInt32: case Int64: case UInt64: case DateTime: - return Optional.of(Long.valueOf(valueOpt.get())); + Optional boolInt64 = booleanStringToInt (valueOpt.get()); + if(boolInt64.isPresent()) { + return Optional.of(Long.valueOf(boolInt64.get())); + } + var l = new BigDecimal(valueOpt.get()); + return Optional.of(l.longValue()); // float case Float: + Optional boolFloat = booleanStringToInt (valueOpt.get()); + if(boolFloat.isPresent()) { + var fb = new BigDecimal(boolFloat.get()); + return Optional.of(fb.floatValue()); + } var f = new BigDecimal(valueOpt.get()); return Optional.of(f.floatValue()); // double case Double: + Optional boolDouble = booleanStringToInt (valueOpt.get()); + if(boolDouble.isPresent()) { + return Optional.of(Double.valueOf(boolDouble.get())); + } var dd = new BigDecimal(valueOpt.get()); return Optional.of(dd.doubleValue()); case Boolean: @@ -367,14 +382,11 @@ public class SparkplugMetricUtil { case String: case Text: case UUID: - if (kv.getTypeValue() == 4) { - return Optional.of(valueOpt.get()); - } - break; + return Optional.of(valueOpt.get()); } } catch (Exception e) { - log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); - return Optional.empty(); + log.trace("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage()); + throw new ThingsboardException("Invalid type value " + kv.toString() + " for MetricDataType " + metricDataType.name(), e, ThingsboardErrorCode.INVALID_ARGUMENTS); } } return Optional.empty(); @@ -597,10 +609,20 @@ public class SparkplugMetricUtil { return Optional.of(String.valueOf(kv.getBoolV())); } else if (kv.getTypeValue() == 1) { // kvLong return Optional.of(String.valueOf(kv.getLongV())); - } else if (kv.getTypeValue() == 2) { // kvString - return Optional.of(kv.getStringV()); - } else if (kv.getTypeValue() == 3) { // kvDouble + } else if (kv.getTypeValue() == 2) { // kvDouble/float return Optional.of(String.valueOf(kv.getDoubleV())); + } else if (kv.getTypeValue() == 3) { // kvString + return Optional.of(kv.getStringV()); + } else { + return Optional.empty(); + } + } + + private static Optional booleanStringToInt (String booleanStr) { + if ("true".equals(booleanStr)) { + return Optional.of(1); + } else if ("false".equals(booleanStr)) { + return Optional.of(0); } else { return Optional.empty(); }