Use shared attributes for sparkplug devices

This commit is contained in:
Andrii Shvaika 2023-04-25 19:03:17 +03:00
parent 64908e8d66
commit a405999a88
5 changed files with 13 additions and 6 deletions

View File

@ -148,6 +148,7 @@ message PostTelemetryMsg {
message PostAttributeMsg {
repeated KeyValueProto kv = 1;
bool shared = 2;
}
message GetAttributeRequestMsg {

View File

@ -535,7 +535,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!");
}
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray());
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);

View File

@ -173,7 +173,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray());
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
@ -286,6 +286,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
if (keyValueProtoOpt.isPresent()) {
TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder();
builder.addKv(keyValueProtoOpt.get());
builder.setShared(true);
return Optional.of(builder.build());
}
return Optional.empty();

View File

@ -67,13 +67,15 @@ public class ProtoConverter {
}
}
public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(byte[] bytes) throws IllegalArgumentException, InvalidProtocolBufferException {
public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(TransportProtos.PostAttributeMsg msg) throws IllegalArgumentException, InvalidProtocolBufferException {
if (!CollectionUtils.isEmpty(msg.getKvList())) {
byte[] bytes = msg.toByteArray();
TransportProtos.PostAttributeMsg proto = TransportProtos.PostAttributeMsg.parseFrom(bytes);
List<TransportProtos.KeyValueProto> kvList = proto.getKvList();
if (!CollectionUtils.isEmpty(kvList)) {
List<TransportProtos.KeyValueProto> keyValueProtos = validateKeyValueProtos(kvList);
TransportProtos.PostAttributeMsg.Builder result = TransportProtos.PostAttributeMsg.newBuilder();
result.addAllKv(keyValueProtos);
result.setShared(msg.getShared());
return result.build();
} else {
throw new IllegalArgumentException("KeyValue list is empty!");

View File

@ -593,6 +593,9 @@ public class DefaultTransportService implements TransportService {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType());
if (msg.getShared()) {
metaData.putValue(DataConstants.SCOPE, DataConstants.SHARED_SCOPE);
}
metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false");
CustomerId customerId = getCustomerId(sessionInfo);
sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,