Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2023-05-01 15:42:06 +03:00
commit 44cf98de0e
7 changed files with 31 additions and 11 deletions

View File

@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.MetricDataType.UInt32;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType.NCMD;
import static org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopicUtil.NAMESPACE;
@ -409,7 +409,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientNodeWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
clientWithCorrectNodeAccessTokenWithNDEATH();
connectionWithNBirth(metricBirthDataType_Int32, metricBirthName_Int32, nextInt32());
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + CLIENT_SCOPE;
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + savedGateway.getId().getId() + "/keys/attributes/" + SHARED_SCOPE;
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
@ -424,7 +424,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
protected void processClientDeviceWithCorrectAccessTokenPublish_AttributesInProfileContainsKeyAttributes() throws Exception {
long ts = calendar.getTimeInMillis();
List<Device> devices = connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices(1, ts);
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + CLIENT_SCOPE;
String urlTemplate = "/api/plugins/telemetry/DEVICE/" + devices.get(0).getId().getId() + "/keys/attributes/" + SHARED_SCOPE;
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
await(alias + SparkplugMessageType.DBIRTH.name())
.atMost(40, TimeUnit.SECONDS)

View File

@ -148,6 +148,7 @@ message PostTelemetryMsg {
message PostAttributeMsg {
repeated KeyValueProto kv = 1;
bool shared = 2;
}
message GetAttributeRequestMsg {

View File

@ -535,7 +535,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!");
}
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray());
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);

View File

@ -173,7 +173,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) {
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
try {
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray());
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
} catch (Throwable e) {
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
@ -233,7 +233,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
try {
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
if (attributesMetricNames == null || !attributesMetricNames.contains(protoMetric.getName())) {
if (attributesMetricNames == null || !matches(attributesMetricNames, protoMetric)) {
long ts = protoMetric.getTimestamp();
String key = "bdSeq".equals(protoMetric.getName()) ?
topicTypeName + " " + protoMetric.getName() : protoMetric.getName();
@ -264,7 +264,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
try {
List<TransportApiProtos.AttributesMsg> msgs = new ArrayList<>();
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) {
if (attributesMetricNames.contains(protoMetric.getName())) {
if (matches(attributesMetricNames, protoMetric)) {
TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder();
Optional<TransportProtos.PostAttributeMsg> msgOpt = getPostAttributeMsg(protoMetric);
if (msgOpt.isPresent()) {
@ -281,11 +281,24 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S
}
}
private boolean matches(Set<String> attributesMetricNames, SparkplugBProto.Payload.Metric protoMetric) {
String metricName = protoMetric.getName();
for (String attributeMetricFilter : attributesMetricNames) {
if (metricName.equals(attributeMetricFilter) ||
(attributeMetricFilter.endsWith("*") && metricName.startsWith(
attributeMetricFilter.substring(0, attributeMetricFilter.length() - 1)))) {
return true;
}
}
return false;
}
private Optional<TransportProtos.PostAttributeMsg> getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException {
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric);
if (keyValueProtoOpt.isPresent()) {
TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder();
builder.addKv(keyValueProtoOpt.get());
builder.setShared(true);
return Optional.of(builder.build());
}
return Optional.empty();

View File

@ -67,13 +67,15 @@ public class ProtoConverter {
}
}
public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(byte[] bytes) throws IllegalArgumentException, InvalidProtocolBufferException {
public static TransportProtos.PostAttributeMsg validatePostAttributeMsg(TransportProtos.PostAttributeMsg msg) throws IllegalArgumentException, InvalidProtocolBufferException {
if (!CollectionUtils.isEmpty(msg.getKvList())) {
byte[] bytes = msg.toByteArray();
TransportProtos.PostAttributeMsg proto = TransportProtos.PostAttributeMsg.parseFrom(bytes);
List<TransportProtos.KeyValueProto> kvList = proto.getKvList();
if (!CollectionUtils.isEmpty(kvList)) {
List<TransportProtos.KeyValueProto> keyValueProtos = validateKeyValueProtos(kvList);
TransportProtos.PostAttributeMsg.Builder result = TransportProtos.PostAttributeMsg.newBuilder();
result.addAllKv(keyValueProtos);
result.setShared(msg.getShared());
return result.build();
} else {
throw new IllegalArgumentException("KeyValue list is empty!");

View File

@ -593,6 +593,9 @@ public class DefaultTransportService implements TransportService {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("deviceName", sessionInfo.getDeviceName());
metaData.putValue("deviceType", sessionInfo.getDeviceType());
if (msg.getShared()) {
metaData.putValue(DataConstants.SCOPE, DataConstants.SHARED_SCOPE);
}
metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false");
CustomerId customerId = getCustomerId(sessionInfo);
sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,

View File

@ -371,6 +371,7 @@ export const createDeviceProfileTransportConfiguration = (type: DeviceTransportT
deviceAttributesTopic: 'v1/devices/me/attributes',
deviceAttributesSubscribeTopic: 'v1/devices/me/attributes',
sparkplug: false,
sparkplugAttributesMetricNames: ['Node Control/*', 'Device Control/*', 'Properties/*'],
sendAckOnValidationException: false,
transportPayloadTypeConfiguration: {
transportPayloadType: TransportPayloadType.JSON,