sparkplug: add metricsBirth (repeat)

This commit is contained in:
nickAS21 2023-02-02 19:00:01 +02:00
parent da573ac53c
commit d6453525d3
5 changed files with 309 additions and 167 deletions

View File

@ -81,6 +81,7 @@ import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import javax.net.ssl.SSLPeerUnverifiedException;
@ -1263,7 +1264,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
try {
if (sparkplugSessionHandler != null) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
sparkplugSessionHandler.createMqttPublishMsg(deviceSessionCtx, notification).ifPresent(sparkplugSessionHandler::writeAndFlush);
notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (sparkplugSessionHandler.getMetricsBirthNode().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(),
SparkplugMessageType.NCMD);
sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
sparkplugSessionHandler.getMetricsBirthNode().get(tsKvProto.getKv().getKey()))
.ifPresent(sparkplugSessionHandler::writeAndFlush);
}
});
} else {
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
}

View File

@ -20,6 +20,8 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@ -39,7 +41,16 @@ public class SparkplugDeviceSessionContext extends GatewayDeviceSessionContext{
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to sparkplug device", sessionId);
((SparkplugNodeSessionHandler)parent).createMqttPublishMsg(this, notification).ifPresent(parent::writeAndFlush);
notification.getSharedUpdatedList().forEach(tsKvProto -> {
if (getMetricsBirthDevice().containsKey(tsKvProto.getKv().getKey())) {
SparkplugTopic sparkplugTopic = new SparkplugTopic(((SparkplugNodeSessionHandler)parent).getSparkplugTopicNode(),
SparkplugMessageType.DCMD, deviceInfo.getDeviceName());
((SparkplugNodeSessionHandler)parent).createSparkplugMqttPublishMsg(tsKvProto,
sparkplugTopic.toString(),
getMetricsBirthDevice().get(tsKvProto.getKv().getKey()))
.ifPresent(parent::writeAndFlush);
}
});
}
}

View File

@ -86,10 +86,10 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
}
public void onTelemetryProto (int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
public void onTelemetryProto(int msgId, SparkplugBProto.Payload sparkplugBProto, String deviceName, SparkplugTopic topic) throws AdaptorException, ThingsboardException {
checkDeviceName(deviceName);
ListenableFuture<MqttDeviceAwareSessionContext> contextListenableFuture = topic.isNode() ?
Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName);
Futures.immediateFuture(this.deviceSessionCtx) : onDeviceConnectProto(deviceName);
List<TransportProtos.PostTelemetryMsg> msgs = convertToPostTelemetry(sparkplugBProto, topic.getType().name());
if (topic.isType(NBIRTH) || topic.isType(DBIRTH)) {
try {
@ -139,7 +139,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
private ListenableFuture<MqttDeviceAwareSessionContext> onDeviceConnectProto(String deviceName) throws ThingsboardException {
try {
String deviceType = this.gateway.getDeviceType() + "-node";
String deviceType = this.gateway.getDeviceType() + "-node";
return onDeviceConnect(deviceName, deviceType);
} catch (RuntimeException e) {
log.error("Failed Sparkplug Device connect proto!", e);
@ -152,9 +152,9 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
long ts = protoMetric.getTimestamp();
String keys = "bdSeq".equals(protoMetric.getName()) ?
String key = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(keys, protoMetric);
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(key, protoMetric);
if (keyValueProtoOpt.isPresent()) {
List<TransportProtos.KeyValueProto> result = new ArrayList<>();
result.add(keyValueProtoOpt.get());
@ -166,6 +166,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
msgs.add(request.build());
}
}
if (DBIRTH.name().equals(topicTypeName)) {
List<TransportProtos.KeyValueProto> result = new ArrayList<>();
TransportProtos.KeyValueProto.Builder keyValueProtoBuilder = TransportProtos.KeyValueProto.newBuilder();
@ -192,29 +193,22 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler {
}
public Optional<MqttPublishMessage> createMqttPublishMsg (MqttDeviceAwareSessionContext ctx,
TransportProtos.AttributeUpdateNotificationMsg notification,
String... deviceName) {
public Optional<MqttPublishMessage> createSparkplugMqttPublishMsg(TransportProtos.TsKvProto tsKvProto,
String sparkplugTopic,
SparkplugBProto.Payload.Metric metricBirth) {
try {
long ts = notification.getSharedUpdated(0).getTs();
String key = notification.getSharedUpdated(0).getKv().getKey();
if (metricsBirthNode.containsKey(key)) {
SparkplugBProto.Payload.Metric metricBirth = metricsBirthNode.get(key);
MetricDataType metricDataType = MetricDataType.fromInteger(metricBirth.getDatatype());
Optional value = validatedValueByTypeMetric(notification.getSharedUpdated(0).getKv(), metricDataType);
if (value.isPresent()) {
SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts);
cmdPayload.addMetrics(createMetric(value, ts, key, metricDataType));
byte[] payloadInBytes = cmdPayload.build().toByteArray();
String topic = deviceName == null ? sparkplugTopicNode.toString() : sparkplugTopicNode.toString()
+ "/" + deviceName;
return Optional.of(getPayloadAdaptor().createMqttPublishMsg(ctx, topic, payloadInBytes));
}
} else {
return Optional.empty();
long ts = tsKvProto.getTs();
MetricDataType metricDataType = MetricDataType.fromInteger(metricBirth.getDatatype());
Optional value = validatedValueByTypeMetric(tsKvProto.getKv(), metricDataType);
if (value.isPresent()) {
SparkplugBProto.Payload.Builder cmdPayload = SparkplugBProto.Payload.newBuilder()
.setTimestamp(ts);
cmdPayload.addMetrics(createMetric(value.get(), ts, tsKvProto.getKv().getKey(), metricDataType));
byte[] payloadInBytes = cmdPayload.build().toByteArray();
return Optional.of(getPayloadAdaptor().createMqttPublishMsg(deviceSessionCtx, sparkplugTopic, payloadInBytes));
}
} catch (Exception e) {
} catch (
Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT sparkplug msg", sessionId, e);
return Optional.empty();
}

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.ser.std.FileSerializer;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -39,6 +40,7 @@ import java.nio.FloatBuffer;
import java.nio.IntBuffer;
import java.nio.LongBuffer;
import java.nio.ShortBuffer;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Optional;
@ -74,8 +76,8 @@ public class SparkplugMetricUtil {
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV(f.doubleValue()).build());
case Double:
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.DOUBLE_V)
.setDoubleV(protoMetric.getDoubleValue()).build());
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.LONG_V)
.setLongV(Double.valueOf(protoMetric.getDoubleValue()).longValue()).build());
case Int8:
case UInt8:
case Int16:
@ -103,7 +105,7 @@ public class SparkplugMetricUtil {
// byte[]
case BooleanArray:
ByteBuffer booleanByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
while (booleanByteBuffer.hasRemaining()){
while (booleanByteBuffer.hasRemaining()) {
nodeArray.add(booleanByteBuffer.get() == (byte) 0 ? false : true);
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -112,7 +114,7 @@ public class SparkplugMetricUtil {
case Bytes:
case Int8Array:
ByteBuffer byteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray());
while (byteBuffer.hasRemaining()){
while (byteBuffer.hasRemaining()) {
nodeArray.add(byteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -122,7 +124,7 @@ public class SparkplugMetricUtil {
case UInt8Array:
ShortBuffer shortByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asShortBuffer();
while (shortByteBuffer.hasRemaining()){
while (shortByteBuffer.hasRemaining()) {
nodeArray.add(shortByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -132,7 +134,7 @@ public class SparkplugMetricUtil {
case UInt16Array:
IntBuffer intByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asIntBuffer();
while (intByteBuffer.hasRemaining()){
while (intByteBuffer.hasRemaining()) {
nodeArray.add(intByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -141,7 +143,7 @@ public class SparkplugMetricUtil {
case FloatArray:
FloatBuffer floatByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asFloatBuffer();
while (floatByteBuffer.hasRemaining()){
while (floatByteBuffer.hasRemaining()) {
nodeArray.add(floatByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -150,7 +152,7 @@ public class SparkplugMetricUtil {
case DoubleArray:
DoubleBuffer doubleByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asDoubleBuffer();
while (doubleByteBuffer.hasRemaining()){
while (doubleByteBuffer.hasRemaining()) {
nodeArray.add(doubleByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -162,7 +164,7 @@ public class SparkplugMetricUtil {
case UInt32Array:
LongBuffer longByteBuffer = ByteBuffer.wrap(protoMetric.getBytesValue().toByteArray())
.asLongBuffer();
while (longByteBuffer.hasRemaining()){
while (longByteBuffer.hasRemaining()) {
nodeArray.add(longByteBuffer.get());
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -175,7 +177,7 @@ public class SparkplugMetricUtil {
new ObjectInputStream(byteArrayInputStream);
final String[] stringArray = (String[]) objectInputStream.readObject();
objectInputStream.close();
for (String s: stringArray) {
for (String s : stringArray) {
nodeArray.add(s);
}
return Optional.of(builderProto.setKey(key).setType(TransportProtos.KeyValueType.JSON_V)
@ -213,13 +215,13 @@ public class SparkplugMetricUtil {
default:
throw new ThingsboardException("Failed to decode: Unknown MetricDataType " + metricType, ThingsboardErrorCode.INVALID_ARGUMENTS);
}
} catch (Exception e){
} catch (Exception e) {
log.error("", e);
return Optional.empty();
return Optional.empty();
}
}
public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException {
public static SparkplugBProto.Payload.Metric createMetric(Object value, long ts, String key, MetricDataType metricDataType) throws ThingsboardException {
SparkplugBProto.Payload.Metric metric = SparkplugBProto.Payload.Metric.newBuilder()
.setTimestamp(ts)
.setName(key)
@ -237,7 +239,7 @@ public class SparkplugMetricUtil {
if (value instanceof Long) {
return metric.toBuilder().setLongValue((long) value).build();
} else {
return metric.toBuilder().setIntValue((int)value).build();
return metric.toBuilder().setIntValue((int) value).build();
}
case Int64:
case UInt64:
@ -307,129 +309,224 @@ public class SparkplugMetricUtil {
return metric;
}
public static Optional<Object> validatedValueByTypeMetric (TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
try {
switch (metricDataType) {
case Int8:
case Int16:
case UInt8:
case UInt16:
case Int32:
case UInt32:
case Int64:
case UInt64:
case DateTime:
if (kv.getTypeValue()==1) {
return Optional.of(Integer.valueOf(String.valueOf(kv.getLongV())));
}
break;
case Float:
if (kv.getTypeValue()==2) {
var f = new BigDecimal(String.valueOf(kv.getDoubleV()));
return Optional.of(f.floatValue());
}
break;
case Double:
if (kv.getTypeValue()==2) {
return Optional.of(kv.getLongV());
}
break;
case Boolean:
if (kv.getTypeValue()==3) {
return Optional.of(kv.getBoolV());
}
break;
case String:
case Text:
case UUID:
if (kv.getTypeValue()==4) {
return Optional.of(kv.getStringV());
}
break;
case Bytes:
case Int8Array:
if (kv.getTypeValue()==5) {
// ByteString byteString = ByteString.copyFrom((byte[]) value);
// return metric.toBuilder().setBytesValue(byteString).build();
return Optional.of(kv.getJsonV());
}
break;
case Int16Array:
case UInt8Array:
if (kv.getTypeValue()==5) {
// byte[] int16Array = shortArrayToByteArray((short[]) value);
// ByteString byteInt16Array = ByteString.copyFrom((int16Array));
return Optional.of(kv.getJsonV());
}
break;
case Int32Array:
case UInt16Array:
case Int64Array:
case UInt32Array:
case UInt64Array:
case DateTimeArray:
if (kv.getTypeValue()==5) {
// if (value instanceof int[]) {
// byte[] int32Array = integerArrayToByteArray((int[]) value);
// ByteString byteInt32Array = ByteString.copyFrom((int32Array));
// return metric.toBuilder().setBytesValue(byteInt32Array).build();
// } else {
// byte[] int64Array = longArrayToByteArray((long[]) value);
// ByteString byteInt64Array = ByteString.copyFrom((int64Array));
// return metric.toBuilder().setBytesValue(byteInt64Array).build();
// }
return Optional.of(kv.getJsonV());
}
break;
case DoubleArray:
if (kv.getTypeValue()==5) {
// byte[] doubleArray = doublArrayToByteArray((double[]) value);
// ByteString byteDoubleArray = ByteString.copyFrom(doubleArray);
// return metric.toBuilder().setBytesValue(byteDoubleArray).build();
return Optional.of(kv.getJsonV());
}
break;
case FloatArray:
if (kv.getTypeValue()==5) {
// byte[] floatArray = floatArrayToByteArray((float[]) value);
// ByteString byteFloatArray = ByteString.copyFrom(floatArray);
// return metric.toBuilder().setBytesValue(byteFloatArray).build();
return Optional.of(kv.getJsonV());
}
break;
case BooleanArray:
if (kv.getTypeValue()==5) {
// byte[] booleanArray = booleanArrayToByteArray((boolean[]) value);
// ByteString byteBooleanArray = ByteString.copyFrom(booleanArray);
// return metric.toBuilder().setBytesValue(byteBooleanArray).build();
return Optional.of(kv.getJsonV());
}
break;
case StringArray:
if (kv.getTypeValue()==5) {
// byte[] stringArray = stringArrayToByteArray((String[]) value);
// ByteString byteStringArray = ByteString.copyFrom(stringArray);
// return metric.toBuilder().setBytesValue(byteStringArray).build();
return Optional.of(kv.getJsonV());
}
break;
case DataSet:
case File:
case Template:
log.error("Invalid type value [{}] for MetricDataType [{}]", kv, metricDataType.name());
return Optional.empty();
case Unknown:
log.error("Invalid MetricDataType [{}] type, value [{}]", kv, metricDataType.name());
return Optional.empty();
}
} catch (Exception e) {
log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
public static Optional<Object> validatedValueByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
if (kv.getTypeValue() <= 3) {
return validatedValuePrimitiveByTypeMetric(kv, metricDataType);
} else if (kv.getTypeValue() == 4) {
return validatedValueJsonByTypeMetric(kv, metricDataType);
} else {
return Optional.empty();
}
}
public static Optional<Object> validatedValuePrimitiveByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
Optional<String> valueOpt = getValueKvProtoPrimitive(kv);
if (valueOpt.isPresent()) {
try {
switch (metricDataType) {
// int
case Int8:
case Int16:
case UInt8:
case UInt16:
return Optional.of(Integer.valueOf(valueOpt.get()));
// int/long
case Int32:
case UInt32:
try {
return Optional.of(Integer.valueOf(valueOpt.get()));
} catch (NumberFormatException e) {
return Optional.of(Long.valueOf(valueOpt.get()));
}
// long
case Int64:
case UInt64:
case DateTime:
return Optional.of(Long.valueOf(valueOpt.get()));
// float
case Float:
var f = new BigDecimal(valueOpt.get());
return Optional.of(f.floatValue());
// double
case Double:
var dd = new BigDecimal(valueOpt.get());
return Optional.of(dd.doubleValue());
case Boolean:
if ("true".equals(valueOpt.get())) {
return Optional.of(true);
} else if ("false".equals(valueOpt.get())) {
return Optional.of(false);
} else {
Number number = NumberFormat.getInstance().parse(valueOpt.get());
if (StringUtils.isBlank(number.toString()) || "0".equals(number.toString())) { // ok 0
return Optional.of(false);
} else {
return Optional.of(true);
}
}
case String:
case Text:
case UUID:
if (kv.getTypeValue() == 4) {
return Optional.of(valueOpt.get());
}
break;
}
} catch (Exception e) {
log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
return Optional.empty();
}
}
return Optional.empty();
}
private static byte[] shortArrayToByteArray(short[] inputs) {
public static Optional<Object> validatedValueJsonByTypeMetric(TransportProtos.KeyValueProto kv, MetricDataType metricDataType) {
// try {
// Optional<Object> valueOpt;
// switch (metricDataType) {
// // int
// case Int8:
// case Int16:
// case UInt8:
// case UInt16:
// valueOpt = getValueKvProtoPrimitive(kv);
// return valueOpt.isPresent() ? Optional.of(Integer.valueOf(String.valueOf(valueOpt.get()))) : valueOpt;
// // int/long
// case Int32:
// case UInt32:
// valueOpt = getValueKvProtoPrimitive(kv);
// try {
// return Optional.of(Integer.valueOf(String.valueOf(valueOpt.get())));
// } catch (NumberFormatException e) {
// return Optional.of(Long.valueOf(String.valueOf(valueOpt.get())));
// }
// // long
// case Int64:
// case UInt64:
// case DateTime:
// valueOpt = getValueKvProtoPrimitive(kv);
// return Optional.of(Long.valueOf(String.valueOf(valueOpt.get())));
// // float
// case Float:
// valueOpt = getValueKvProtoPrimitive(kv);
// var f = new BigDecimal(String.valueOf(kv.getDoubleV()));
// return Optional.of(f.floatValue());
// }
// break;
// // double
// case Double:
// if (kv.getTypeValue() == 1) {
// return Optional.of(kv.getLongV());
// }
// break;
// case Boolean:
// if (kv.getTypeValue() == 0) { // ok 0
// return Optional.of(kv.getBoolV());
// }
// break;
// case String:
// case Text:
// case UUID:
// if (kv.getTypeValue() == 4) {
// return Optional.of(kv.getStringV());
// }
// break;
// // byte[]
// case Bytes:
// case Int8Array:
// if (kv.getTypeValue() == 5) {
//// ByteString byteString = ByteString.copyFrom((byte[]) value);
//// return metric.toBuilder().setBytesValue(byteString).build();
// return Optional.of(kv.getJsonV());
// }
// break;
// // short[]
// case Int16Array:
// case UInt8Array:
// if (kv.getTypeValue() == 5) {
//// byte[] int16Array = shortArrayToByteArray((short[]) value);
//// ByteString byteInt16Array = ByteString.copyFrom((int16Array));
// return Optional.of(kv.getJsonV());
// }
// break;
// // int []
// case UInt16Array:
// case Int32Array:
//
// // int[] / long[]
// case UInt32Array:
//
// // long[]
// case Int64Array:
// case UInt64Array:
// case DateTimeArray:
// if (kv.getTypeValue() == 5) {
//// if (value instanceof int[]) {
//// byte[] int32Array = integerArrayToByteArray((int[]) value);
//// ByteString byteInt32Array = ByteString.copyFrom((int32Array));
//// return metric.toBuilder().setBytesValue(byteInt32Array).build();
//// } else {
//// byte[] int64Array = longArrayToByteArray((long[]) value);
//// ByteString byteInt64Array = ByteString.copyFrom((int64Array));
//// return metric.toBuilder().setBytesValue(byteInt64Array).build();
//// }
// return Optional.of(kv.getJsonV());
// }
// break;
// // double []
// case DoubleArray:
// if (kv.getTypeValue() == 5) {
//// byte[] doubleArray = doublArrayToByteArray((double[]) value);
//// ByteString byteDoubleArray = ByteString.copyFrom(doubleArray);
//// return metric.toBuilder().setBytesValue(byteDoubleArray).build();
// return Optional.of(kv.getJsonV());
// }
// break;
// // float[]
// case FloatArray:
// if (kv.getTypeValue() == 5) {
//// byte[] floatArray = floatArrayToByteArray((float[]) value);
//// ByteString byteFloatArray = ByteString.copyFrom(floatArray);
//// return metric.toBuilder().setBytesValue(byteFloatArray).build();
// return Optional.of(kv.getJsonV());
// }
// break;
// // boolean[]
// case BooleanArray:
// if (kv.getTypeValue() == 5) {
//// byte[] booleanArray = booleanArrayToByteArray((boolean[]) value);
//// ByteString byteBooleanArray = ByteString.copyFrom(booleanArray);
//// return metric.toBuilder().setBytesValue(byteBooleanArray).build();
// return Optional.of(kv.getJsonV());
// }
// break;
// // String []
// case StringArray:
// if (kv.getTypeValue() == 5) {
//// byte[] stringArray = stringArrayToByteArray((String[]) value);
//// ByteString byteStringArray = ByteString.copyFrom(stringArray);
//// return metric.toBuilder().setBytesValue(byteStringArray).build();
// return Optional.of(kv.getJsonV());
// }
// break;
// case DataSet:
// case File:
// case Template:
// log.error("Invalid type value [{}] for MetricDataType [{}]", kv, metricDataType.name());
// return Optional.empty();
// case Unknown:
// log.error("Invalid MetricDataType [{}] type, value [{}]", kv, metricDataType.name());
// return Optional.empty();
// } catch (Exception e) {
// log.error("Invalid type value [{}] for MetricDataType [{}] [{}]", kv, metricDataType.name(), e.getMessage());
// return Optional.empty();
// }
return Optional.empty();
}
private static byte[] shortArrayToByteArray(short[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 2);
for (short d : inputs) {
bb.putShort(d);
@ -437,7 +534,7 @@ public class SparkplugMetricUtil {
return bb.array();
}
private static byte[] integerArrayToByteArray(int[] inputs) {
private static byte[] integerArrayToByteArray(int[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 4);
for (int d : inputs) {
bb.putInt(d);
@ -445,7 +542,7 @@ public class SparkplugMetricUtil {
return bb.array();
}
private static byte[] longArrayToByteArray(long[] inputs) {
private static byte[] longArrayToByteArray(long[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (long d : inputs) {
bb.putLong(d);
@ -453,7 +550,7 @@ public class SparkplugMetricUtil {
return bb.array();
}
private static byte[] doublArrayToByteArray(double[] inputs) {
private static byte[] doublArrayToByteArray(double[] inputs) {
ByteBuffer bb = ByteBuffer.allocate(inputs.length * 8);
for (double d : inputs) {
bb.putDouble(d);
@ -461,7 +558,7 @@ public class SparkplugMetricUtil {
return bb.array();
}
private static byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException {
private static byte[] floatArrayToByteArray(float[] inputs) throws ThingsboardException {
ByteArrayOutputStream bas = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bas);
for (float f : inputs) {
@ -474,15 +571,15 @@ public class SparkplugMetricUtil {
return bas.toByteArray();
}
private static byte[] booleanArrayToByteArray(boolean[] inputs) {
private static byte[] booleanArrayToByteArray(boolean[] inputs) {
byte[] toReturn = new byte[inputs.length];
for (int entry = 0; entry < toReturn.length; entry++) {
toReturn[entry] = (byte) (inputs[entry]?1:0);
toReturn[entry] = (byte) (inputs[entry] ? 1 : 0);
}
return toReturn;
}
private static byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException {
private static byte[] stringArrayToByteArray(String[] inputs) throws ThingsboardException {
final ByteArrayOutputStream bas = new ByteArrayOutputStream();
try {
final ObjectOutputStream os = new ObjectOutputStream(bas);
@ -495,6 +592,19 @@ public class SparkplugMetricUtil {
return bas.toByteArray();
}
private static Optional<String> getValueKvProtoPrimitive(TransportProtos.KeyValueProto kv) {
if (kv.getTypeValue() == 0) { // boolean
return Optional.of(String.valueOf(kv.getBoolV()));
} else if (kv.getTypeValue() == 1) { // kvLong
return Optional.of(String.valueOf(kv.getLongV()));
} else if (kv.getTypeValue() == 2) { // kvString
return Optional.of(kv.getStringV());
} else if (kv.getTypeValue() == 3) { // kvDouble
return Optional.of(String.valueOf(kv.getDoubleV()));
} else {
return Optional.empty();
}
}
@JsonIgnoreProperties(
value = {"fileName"})

View File

@ -87,6 +87,23 @@ public class SparkplugTopic {
this.type = type;
}
public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type) {
super();
this.namespace = sparkplugTopic.namespace;
this.groupId = sparkplugTopic.groupId;
this.edgeNodeId = sparkplugTopic.edgeNodeId;
this.deviceId = null;
this.type = type;
}
public SparkplugTopic(SparkplugTopic sparkplugTopic, SparkplugMessageType type, String deviceId) {
super();
this.namespace = sparkplugTopic.namespace;
this.groupId = sparkplugTopic.groupId;
this.edgeNodeId = sparkplugTopic.edgeNodeId;
this.deviceId = deviceId;
this.type = type;
}
/**
* Returns the Sparkplug namespace version.
*