sparkplug: add converter kvProto to value Metric

This commit is contained in:
nickAS21 2023-02-03 16:10:54 +02:00
parent d6453525d3
commit bcdc618e48
5 changed files with 78 additions and 50 deletions

View File

@ -393,7 +393,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
// TODO
break;
case NBIRTH:
sparkplugSessionHandler.setMetricsBirthNode (sparkplugBProtoNode.getMetricsList());
sparkplugSessionHandler.setNodeBirthMetrics(sparkplugBProtoNode.getMetricsList());
sparkplugSessionHandler.onTelemetryProto(msgId, sparkplugBProtoNode, deviceName, sparkplugTopic);
break;
case NCMD:
@ -1265,12 +1265,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (sparkplugSessionHandler != null) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (sparkplugSessionHandler.getMetricsBirthNode().containsKey(tsKvProto.getKv().getKey())) {
if (sparkplugSessionHandler.getNodeBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
SparkplugMessageType.NCMD);
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
sparkplugSessionHandler.getMetricsBirthNode().get(tsKvProto.getKv().getKey()))
sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(sparkplugSessionHandler::writeAndFlush);
}
});

View File

@ -32,24 +32,27 @@ import java.util.stream.Collectors;
public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
private final Map<String, SparkplugBProto.Payload.Metric> metricsBirthDevice;
private Map<String, SparkplugBProto.Payload.Metric> deviceBirthMetrics;
public MqttDeviceAwareSessionContext(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap) {
super(sessionId);
this.mqttQoSMap = mqttQoSMap;
this.metricsBirthDevice = new ConcurrentHashMap<>();
this.deviceBirthMetrics = null;
}
public ConcurrentMap<MqttTopicMatcher, Integer> getMqttQoSMap() {
return mqttQoSMap;
}
public Map<String, SparkplugBProto.Payload.Metric> getMetricsBirthDevice() {
return metricsBirthDevice;
public Map<String, SparkplugBProto.Payload.Metric> getDeviceBirthMetrics() {
return deviceBirthMetrics;
}
public void setMetricsBirthDevice(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.metricsBirthDevice.putAll(metrics.stream()
public void setDeviceBirthMetrics(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
if (this.deviceBirthMetrics == null) {
this.deviceBirthMetrics = new ConcurrentHashMap<>();
}
this.deviceBirthMetrics.putAll(metrics.stream()
.collect(Collectors.toMap(metric -> metric.getName(), metric -> metric)));
}

View File

@ -42,12 +42,12 @@ public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (getMetricsBirthDevice().containsKey(tsKvProto.getKv().getKey())) {
if (getDeviceBirthMetrics().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(((SparkplugNodeSessionHandler)parent).getSparkplugTopicNode(),
SparkplugMessageType.DCMD, deviceInfo.getDeviceName());
((SparkplugNodeSessionHandler)parent).createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
getMetricsBirthDevice().get(tsKvProto.getKv().getKey()))
getDeviceBirthMetrics().get(tsKvProto.getKv().getKey()))
.ifPresent(parent::writeAndFlush);
}
});

View File

@ -56,22 +56,22 @@ import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMetr
public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
private final SparkplugTopic sparkplugTopicNode;
private final Map<String, SparkplugBProto.Payload.Metric> metricsBirthNode;
private final Map<String, SparkplugBProto.Payload.Metric> nodeBirthMetrics;
public SparkplugNodeSessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId,
SparkplugTopic sparkplugTopicNode) {
super(deviceSessionCtx, sessionId);
this.sparkplugTopicNode = sparkplugTopicNode;
this.metricsBirthNode = new ConcurrentHashMap<>();
this.nodeBirthMetrics = new ConcurrentHashMap<>();
}
public void setMetricsBirthNode(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.metricsBirthNode.putAll(metrics.stream()
public void setNodeBirthMetrics(java.util.List<org.thingsboard.server.gen.transport.mqtt.SparkplugBProto.Payload.Metric> metrics) {
this.nodeBirthMetrics.putAll(metrics.stream()
.collect(Collectors.toMap(metric -> metric.getName(), metric -> metric)));
}
public Map<String, SparkplugBProto.Payload.Metric> getMetricsBirthNode() {
return this.metricsBirthNode;
public Map<String, SparkplugBProto.Payload.Metric> getNodeBirthMetrics() {
return this.nodeBirthMetrics;
}
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
@ -93,7 +93,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
try {
contextListenableFuture.get().setMetricsBirthDevice(sparkplugBProto.getMetricsList());
contextListenableFuture.get().setDeviceBirthMetrics(sparkplugBProto.getMetricsList());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed add Metrics. MessageType *BIRTH.", e);
}
@ -206,10 +206,13 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
cmdPayload.addMetrics(createMetric(value.get(), ts, tsKvProto.getKv().getKey(), metricDataType));
byte[] payloadInBytes = cmdPayload.build().toByteArray();
return Optional.of(getPayloadAdaptor().createMqttPublishMsg(deviceSessionCtx, sparkplugTopic, payloadInBytes));
} else {
log.trace("DeviceId: [{}] tenantId: [{}] sessionId:[{}] Failed to convert device attributes [{}] response to MQTT sparkplug msg",
deviceSessionCtx.getDeviceInfo().getDeviceId(), deviceSessionCtx.getDeviceInfo().getTenantId(), sessionId, tsKvProto.getKv());
}
} catch (
Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e);
} catch (Exception e) {
log.trace("DeviceId: [{}] tenantId: [{}] sessionId:[{}] Failed to convert device attributes response to MQTT sparkplug msg",
deviceSessionCtx.getDeviceInfo().getDeviceId(), deviceSessionCtx.getDeviceInfo().getTenantId(), sessionId, e);
return Optional.empty();
}
return Optional.empty();

View File

@ -232,25 +232,19 @@ public class SparkplugMetricUtil {
case Int16:
case UInt8:
case UInt16:
int valueMetric = Integer.valueOf(String.valueOf(value));
return metric.toBuilder().setIntValue(valueMetric).build();
case Int32:
return metric.toBuilder().setIntValue(((Integer) value).intValue()).build();
case UInt32:
if (value instanceof Long) {
return metric.toBuilder().setLongValue((long) value).build();
} else {
return metric.toBuilder().setIntValue((int) value).build();
}
case Int64:
case UInt64:
case DateTime:
return metric.toBuilder().setLongValue((long) value).build();
return metric.toBuilder().setLongValue(((Long) value).longValue()).build();
case Float:
return metric.toBuilder().setFloatValue((float) value).build();
return metric.toBuilder().setFloatValue(((Float) value).floatValue()).build();
case Double:
return metric.toBuilder().setDoubleValue((double) value).build();
return metric.toBuilder().setDoubleValue(((Double) value).doubleValue()).build();
case Boolean:
return metric.toBuilder().setBooleanValue((boolean) value).build();
return metric.toBuilder().setBooleanValue(((Boolean) value).booleanValue()).build();
case String:
case Text:
case UUID:
@ -309,17 +303,17 @@ public class SparkplugMetricUtil {
return metric;
}
public static Optional<Object> validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
public static Optional<Object> validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) throws ThingsboardException {
if (kv.getTypeValue() <= 3) {
return validatedValuePrimitiveByTypeMetric(kv, metricDataType);
} else if (kv.getTypeValue() == 4) {
return validatedValueJsonByTypeMetric(kv, metricDataType);
} else {
return Optional.empty();
throw new ThingsboardException("Invalid type KeyValueProto " + kv.toString() + " for MetricDataType " + metricDataType.name(), ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}
public static Optional<Object> validatedValuePrimitiveByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
public static Optional<Object> validatedValuePrimitiveByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) throws ThingsboardException {
Optional<String> valueOpt = getValueKvProtoPrimitive(kv);
if (valueOpt.isPresent()) {
try {
@ -329,26 +323,47 @@ public class SparkplugMetricUtil {
case Int16:
case UInt8:
case UInt16:
return Optional.of(Integer.valueOf(valueOpt.get()));
// int/long
case Int32:
case UInt32:
Optional <Integer> boolInt8 = booleanStringToInt (valueOpt.get());
if(boolInt8.isPresent()) {
return Optional.of(boolInt8.get());
}
try {
return Optional.of(Integer.valueOf(valueOpt.get()));
} catch (NumberFormatException e) {
return Optional.of(Long.valueOf(valueOpt.get()));
} catch (NumberFormatException eInt) {
var i = new BigDecimal(valueOpt.get());
if (i.longValue() <= Integer.MAX_VALUE) {
return Optional.of(i.intValue());
}
throw new ThingsboardException("Invalid type value " + kv.toString() + " for MetricDataType "
+ metricDataType.name(), eInt, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
// long
case UInt32:
case Int64:
case UInt64:
case DateTime:
return Optional.of(Long.valueOf(valueOpt.get()));
Optional <Integer> boolInt64 = booleanStringToInt (valueOpt.get());
if(boolInt64.isPresent()) {
return Optional.of(Long.valueOf(boolInt64.get()));
}
var l = new BigDecimal(valueOpt.get());
return Optional.of(l.longValue());
// float
case Float:
Optional <Integer> boolFloat = booleanStringToInt (valueOpt.get());
if(boolFloat.isPresent()) {
var fb = new BigDecimal(boolFloat.get());
return Optional.of(fb.floatValue());
}
var f = new BigDecimal(valueOpt.get());
return Optional.of(f.floatValue());
// double
case Double:
Optional <Integer> boolDouble = booleanStringToInt (valueOpt.get());
if(boolDouble.isPresent()) {
return Optional.of(Double.valueOf(boolDouble.get()));
}
var dd = new BigDecimal(valueOpt.get());
return Optional.of(dd.doubleValue());
case Boolean:
@ -367,14 +382,11 @@ public class SparkplugMetricUtil {
case String:
case Text:
case UUID:
if (kv.getTypeValue() == 4) {
return Optional.of(valueOpt.get());
}
break;
return Optional.of(valueOpt.get());
}
} catch (Exception e) {
log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
return Optional.empty();
log.trace("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
throw new ThingsboardException("Invalid type value " + kv.toString() + " for MetricDataType " + metricDataType.name(), e, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
}
return Optional.empty();
@ -597,10 +609,20 @@ public class SparkplugMetricUtil {
return Optional.of(String.valueOf(kv.getBoolV()));
} else if (kv.getTypeValue() == 1) { // kvLong
return Optional.of(String.valueOf(kv.getLongV()));
} else if (kv.getTypeValue() == 2) { // kvString
return Optional.of(kv.getStringV());
} else if (kv.getTypeValue() == 3) { // kvDouble
} else if (kv.getTypeValue() == 2) { // kvDouble/float
return Optional.of(String.valueOf(kv.getDoubleV()));
} else if (kv.getTypeValue() == 3) { // kvString
return Optional.of(kv.getStringV());
} else {
return Optional.empty();
}
}
private static Optional<Integer> booleanStringToInt (String booleanStr) {
if ("true".equals(booleanStr)) {
return Optional.of(1);
} else if ("false".equals(booleanStr)) {
return Optional.of(0);
} else {
return Optional.empty();
}