cleanup code

This commit is contained in:
ShvaykaD 2020-10-13 13:57:38 +03:00
parent 3cd88e6d15
commit 64cc74c26c
6 changed files with 26 additions and 30 deletions

View File

@ -19,18 +19,11 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.ProtoParser;
import com.squareup.wire.schema.internal.parser.TypeElement;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Slf4j
@Data

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.github.os72.protobuf.dynamic.DynamicSchema;
@ -40,7 +39,6 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@ -56,12 +54,6 @@ public class MqttProtoDeviceProfileTransportConfiguration extends MqttDeviceProf
private String deviceTelemetryProtoSchema;
private String deviceAttributesProtoSchema;
@JsonIgnore
private Descriptors.Descriptor telemetryMsgDescriptor;
@JsonIgnore
private Descriptors.Descriptor attributesMsgDescriptor;
@Override
public DeviceTransportType getType() {
return super.getType();

View File

@ -18,10 +18,10 @@ package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.TransportPayloadType;
import java.io.IOException;
@ -36,7 +36,8 @@ public class MqttTransportConfigurationDeserializer extends StdDeserializer<Mqtt
public MqttDeviceProfileTransportConfiguration deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
try {
JsonNode jsonNode = jsonParser.readValueAsTree();
if (jsonNode.has("deviceTelemetryProtoSchema") || jsonNode.has("deviceAttributesProtoSchema")) {
if (jsonNode.hasNonNull("transportPayloadType") && jsonNode.get("transportPayloadType").asText().equals(TransportPayloadType.PROTOBUF.name())) {
return jsonParser.getCodec().treeToValue(jsonNode, MqttProtoDeviceProfileTransportConfiguration.class);
} else {
return jsonParser.getCodec().treeToValue(jsonNode, MqttJsonDeviceProfileTransportConfiguration.class);

View File

@ -52,11 +52,9 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
Descriptors.Descriptor telemetryDynamicMsgDescriptor = getDescriptor(deviceSessionCtx.getTelemetryDynamicMsgDescriptor());
try {
Descriptors.Descriptor telemetryDynamicMsgDescriptor = deviceSessionCtx.getTelemetryDynamicMsgDescriptor();
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(telemetryDynamicMsgDescriptor, bytes);
String stringMsg = JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(stringMsg));
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(dynamicMsgToJson(bytes, telemetryDynamicMsgDescriptor)));
} catch (Exception e) {
throw new AdaptorException(e);
}
@ -66,11 +64,9 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
Descriptors.Descriptor attributesDynamicMessage = getDescriptor(deviceSessionCtx.getAttributesDynamicMessageDescriptor());
try {
Descriptors.Descriptor attributesDynamicMessage = deviceSessionCtx.getAttributesDynamicMessageDescriptor();
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(attributesDynamicMessage, bytes);
String stringMsg = JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
return JsonConverter.convertToAttributesProto(new JsonParser().parse(stringMsg));
return JsonConverter.convertToAttributesProto(new JsonParser().parse(dynamicMsgToJson(bytes, attributesDynamicMessage)));
} catch (InvalidProtocolBufferException | IllegalArgumentException e) {
throw new AdaptorException(e);
}
@ -204,4 +200,16 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
return Integer.parseInt(topicName.substring(topic.length()));
}
private Descriptors.Descriptor getDescriptor(Descriptors.Descriptor descriptor) throws AdaptorException {
if (descriptor == null) {
throw new AdaptorException("Failed to get dynamic message descriptor!");
}
return descriptor;
}
private String dynamicMsgToJson(byte[] bytes, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(descriptor, bytes);
return JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
}
}

View File

@ -113,13 +113,17 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic());
attributesTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
if (payloadType.equals(TransportPayloadType.PROTOBUF) && mqttConfig instanceof MqttProtoDeviceProfileTransportConfiguration) {
MqttProtoDeviceProfileTransportConfiguration protoMqttConfig = (MqttProtoDeviceProfileTransportConfiguration) mqttConfig;
telemetryDynamicMessageDescriptor = protoMqttConfig.getDynamicMessageDescriptor(protoMqttConfig.getDeviceTelemetryProtoSchema(), "telemetrySchema");
attributesDynamicMessageDescriptor = protoMqttConfig.getDynamicMessageDescriptor(protoMqttConfig.getDeviceAttributesProtoSchema(), "attributesSchema");
updateDynamicMessageDescriptors(mqttConfig);
}
} else {
telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
}
}
private void updateDynamicMessageDescriptors(MqttDeviceProfileTransportConfiguration mqttConfig) {
MqttProtoDeviceProfileTransportConfiguration protoMqttConfig = (MqttProtoDeviceProfileTransportConfiguration) mqttConfig;
telemetryDynamicMessageDescriptor = protoMqttConfig.getDynamicMessageDescriptor(protoMqttConfig.getDeviceTelemetryProtoSchema(), "telemetrySchema");
attributesDynamicMessageDescriptor = protoMqttConfig.getDynamicMessageDescriptor(protoMqttConfig.getDeviceAttributesProtoSchema(), "attributesSchema");
}
}

View File

@ -88,8 +88,6 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
deviceAttributesTopic: [null, [Validators.required, this.validationMQTTTopic()]],
deviceTelemetryTopic: [null, [Validators.required, this.validationMQTTTopic()]],
transportPayloadType: [MqttTransportPayloadType.JSON, Validators.required],
// deviceTelemetryProtoSchema: [null, Validators.required],
// deviceAttributesProtoSchema: [null, Validators.required]
})
});
this.mqttDeviceProfileTransportConfigurationFormGroup.valueChanges.subscribe(() => {