init commit

This commit is contained in:
ShvaykaD 2020-10-12 16:21:05 +03:00
parent fd602dec7f
commit 8d4718320b
20 changed files with 450 additions and 34 deletions

View File

@ -29,7 +29,12 @@ import org.springframework.web.bind.annotation.RestController;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileInfo;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttProtoDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.page.PageData;
@ -92,6 +97,16 @@ public class DeviceProfileController extends BaseController {
checkEntity(deviceProfile.getId(), deviceProfile, Resource.DEVICE_PROFILE);
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
if (transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) {
if (transportConfiguration instanceof MqttProtoDeviceProfileTransportConfiguration) {
MqttProtoDeviceProfileTransportConfiguration protoTransportConfiguration = (MqttProtoDeviceProfileTransportConfiguration) transportConfiguration;
if (protoTransportConfiguration.getTransportPayloadType().equals(TransportPayloadType.PROTOBUF))
checkProtoSchemas(protoTransportConfiguration);
}
}
DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile));
deviceProfileCache.put(savedDeviceProfile);
@ -200,4 +215,14 @@ public class DeviceProfileController extends BaseController {
throw handleException(e);
}
}
private void checkProtoSchemas(MqttProtoDeviceProfileTransportConfiguration mqttTransportConfiguration) throws ThingsboardException {
try {
mqttTransportConfiguration.validateTransportProtoSchema(mqttTransportConfiguration.getDeviceAttributesProtoSchema(), "attributes proto schema");
mqttTransportConfiguration.validateTransportProtoSchema(mqttTransportConfiguration.getDeviceTelemetryProtoSchema(), "telemetry proto schema");
} catch (Exception exception) {
throw new ThingsboardException(exception.getMessage(), ThingsboardErrorCode.GENERAL);
}
}
}

View File

@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttJsonDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttProtoDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.controller.AbstractControllerTest;
@ -190,8 +192,12 @@ public abstract class AbstractMqttIntegrationTest extends AbstractControllerTest
deviceProfile.setDescription(transportPayloadType.name() + " Test");
DeviceProfileData deviceProfileData = new DeviceProfileData();
DefaultDeviceProfileConfiguration configuration = new DefaultDeviceProfileConfiguration();
MqttDeviceProfileTransportConfiguration transportConfiguration = new MqttDeviceProfileTransportConfiguration();
transportConfiguration.setTransportPayloadType(transportPayloadType);
MqttDeviceProfileTransportConfiguration transportConfiguration;
if (TransportPayloadType.JSON.equals(transportPayloadType)) {
transportConfiguration = new MqttJsonDeviceProfileTransportConfiguration();
} else {
transportConfiguration = new MqttProtoDeviceProfileTransportConfiguration();
}
if (!StringUtils.isEmpty(telemetryTopic)) {
transportConfiguration.setDeviceTelemetryTopic(telemetryTopic);
}

View File

@ -23,6 +23,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TransportPayloadType;
@ -55,12 +56,14 @@ public abstract class AbstractMqttAttributesRequestProtoIntegrationTest extends
}
@Test
@Ignore
public void testRequestAttributesValuesFromTheServer() throws Exception {
processTestRequestAttributesValuesFromTheServer();
}
@Test
@Ignore
public void testRequestAttributesValuesFromTheServerGateway() throws Exception {
processTestGatewayRequestAttributesValuesFromTheServer();
}

View File

@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
@ -47,11 +48,13 @@ public abstract class AbstractMqttAttributesUpdatesProtoIntegrationTest extends
}
@Test
@Ignore
public void testSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processTestSubscribeToAttributesUpdates();
}
@Test
@Ignore
public void testSubscribeToAttributesUpdatesFromTheServerGateway() throws Exception {
processGatewayTestSubscribeToAttributesUpdates();
}

View File

@ -46,21 +46,25 @@ public abstract class AbstractMqttServerSideRpcProtoIntegrationTest extends Abst
}
@Test
@Ignore
public void testServerMqttOneWayRpc() throws Exception {
processOneWayRpcTest();
}
@Test
@Ignore
public void testServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTest();
}
@Test
@Ignore
public void testGatewayServerMqttOneWayRpc() throws Exception {
processOneWayRpcTestGateway("Gateway Device OneWay RPC Proto");
}
@Test
@Ignore
public void testGatewayServerMqttTwoWayRpc() throws Exception {
processTwoWayRpcTestGateway("Gateway Device TwoWay RPC Proto");
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.mqtt.telemetry.attributes;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.gen.transport.TransportApiProtos;
@ -46,6 +47,7 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
}
@Test
@Ignore
public void testPushMqttAttributes() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
TransportProtos.PostAttributeMsg msg = getPostAttributeMsg(expectedKeys);
@ -53,6 +55,7 @@ public abstract class AbstractMqttAttributesProtoIntegrationTest extends Abstrac
}
@Test
@Ignore
public void testPushMqttAttributesGateway() throws Exception {
TransportApiProtos.GatewayAttributesMsg.Builder gatewayAttributesMsgProtoBuilder = TransportApiProtos.GatewayAttributesMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

View File

@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TransportPayloadType;
@ -48,6 +49,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
}
@Test
@Ignore
public void testPushMqttTelemetry() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
TransportProtos.TsKvListProto tsKvListProto = getTsKvListProto(expectedKeys, 0);
@ -55,6 +57,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
}
@Test
@Ignore
public void testPushMqttTelemetryWithTs() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
TransportProtos.TsKvListProto tsKvListProto = getTsKvListProto(expectedKeys, 10000);
@ -62,6 +65,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
}
@Test
@Ignore
public void testPushMqttTelemetryGateway() throws Exception {
TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
@ -75,6 +79,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
}
@Test
@Ignore
public void testGatewayConnect() throws Exception {
String deviceName = "Device A";
TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName);

View File

@ -71,6 +71,18 @@
<artifactId>java-driver-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -29,7 +29,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = DefaultDeviceProfileTransportConfiguration.class, name = "DEFAULT"),
@JsonSubTypes.Type(value = MqttDeviceProfileTransportConfiguration.class, name = "MQTT"),
@JsonSubTypes.Type(value = MqttDeviceProfileTransportConfiguration.class, name = "MQTT"),
@JsonSubTypes.Type(value = Lwm2mDeviceProfileTransportConfiguration.class, name = "LWM2M")})
public interface DeviceProfileTransportConfiguration {

View File

@ -15,17 +15,40 @@
*/
package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.ProtoParser;
import com.squareup.wire.schema.internal.parser.TypeElement;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Slf4j
@Data
public class MqttDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration {
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "transportPayloadType")
@JsonSubTypes({
@JsonSubTypes.Type(value = MqttJsonDeviceProfileTransportConfiguration.class, name = "JSON"),
@JsonSubTypes.Type(value = MqttProtoDeviceProfileTransportConfiguration.class, name = "PROTOBUF")})
@JsonDeserialize(using = MqttTransportConfigurationDeserializer.class)
public abstract class MqttDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration {
private TransportPayloadType transportPayloadType = TransportPayloadType.JSON;
protected String deviceTelemetryTopic = MqttTopics.DEVICE_TELEMETRY_TOPIC;
protected String deviceAttributesTopic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
private String deviceTelemetryTopic = MqttTopics.DEVICE_TELEMETRY_TOPIC;
private String deviceAttributesTopic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
public abstract TransportPayloadType getTransportPayloadType();
@Override
public DeviceTransportType getType() {

View File

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
@Slf4j
@EqualsAndHashCode(callSuper = true)
@Data
@JsonDeserialize(using = JsonDeserializer.None.class)
public class MqttJsonDeviceProfileTransportConfiguration extends MqttDeviceProfileTransportConfiguration{
@Override
public DeviceTransportType getType() {
return super.getType();
}
@Override
public TransportPayloadType getTransportPayloadType() {
return TransportPayloadType.JSON;
}
}

View File

@ -0,0 +1,152 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.github.os72.protobuf.dynamic.EnumDefinition;
import com.github.os72.protobuf.dynamic.MessageDefinition;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.internal.parser.EnumConstantElement;
import com.squareup.wire.schema.internal.parser.EnumElement;
import com.squareup.wire.schema.internal.parser.FieldElement;
import com.squareup.wire.schema.internal.parser.MessageElement;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import com.squareup.wire.schema.internal.parser.ProtoParser;
import com.squareup.wire.schema.internal.parser.TypeElement;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
@EqualsAndHashCode(callSuper = true)
@Data
@JsonDeserialize(using = JsonDeserializer.None.class)
public class MqttProtoDeviceProfileTransportConfiguration extends MqttDeviceProfileTransportConfiguration {
public static final Location LOCATION = new Location("", "", -1, -1);
public static final String ATTRIBUTES_PROTO_SCHEMA = "attributes proto schema";
public static final String TELEMETRY_PROTO_SCHEMA = "telemetry proto schema";
private String deviceTelemetryProtoSchema;
private String deviceAttributesProtoSchema;
@JsonIgnore
private Descriptors.Descriptor telemetryMsgDescriptor;
@JsonIgnore
private Descriptors.Descriptor attributesMsgDescriptor;
@Override
public DeviceTransportType getType() {
return super.getType();
}
@Override
public TransportPayloadType getTransportPayloadType() {
return TransportPayloadType.PROTOBUF;
}
public void validateTransportProtoSchema(String schema, String schemaName) throws IllegalArgumentException {
ProtoParser schemaParser = new ProtoParser(LOCATION, schema.toCharArray());
ProtoFileElement protoFileElement;
try {
protoFileElement = schemaParser.readProtoFile();
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse: " + schemaName + " due to: " + e.getMessage());
}
List<TypeElement> types = protoFileElement.getTypes();
if (!CollectionUtils.isEmpty(types)) {
if (types.stream().noneMatch(typeElement -> typeElement instanceof MessageElement)) {
throw new IllegalArgumentException("Invalid " + schemaName + " provided! At least one Message definition should exists!");
}
} else {
throw new IllegalArgumentException("Invalid " + schemaName + " provided!");
}
}
public Descriptors.Descriptor getDynamicMessageDescriptor(String protoSchema, String schemaName) {
ProtoFileElement protoFileElement = getTransportProtoSchema(protoSchema);
DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
schemaBuilder.setName(schemaName);
schemaBuilder.setPackage(!StringUtils.isEmpty(protoFileElement.getPackageName()) ?
protoFileElement.getPackageName() : schemaName.toLowerCase());
List<TypeElement> types = protoFileElement.getTypes();
List<EnumElement> enumTypes = types.stream()
.filter(typeElement -> typeElement instanceof EnumElement)
.map(typeElement -> (EnumElement) typeElement)
.collect(Collectors.toList());
List<MessageElement> messageTypes = types.stream()
.filter(typeElement -> typeElement instanceof MessageElement)
.map(typeElement -> (MessageElement) typeElement)
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(enumTypes)) {
enumTypes.forEach(enumElement -> {
List<EnumConstantElement> enumElementTypeConstants = enumElement.getConstants();
EnumDefinition.Builder enumDefinitionBuilder = EnumDefinition.newBuilder(enumElement.getName());
if (!CollectionUtils.isEmpty(enumElementTypeConstants)) {
enumElementTypeConstants.forEach(constantElement -> enumDefinitionBuilder.addValue(constantElement.getName(), constantElement.getTag()));
}
EnumDefinition enumDefinition = enumDefinitionBuilder.build();
schemaBuilder.addEnumDefinition(enumDefinition);
});
}
if (!CollectionUtils.isEmpty(messageTypes)) {
messageTypes.forEach(messageElement -> {
List<FieldElement> messageElementFields = messageElement.getFields();
MessageDefinition.Builder messageDefinitionBuilder = MessageDefinition.newBuilder(messageElement.getName());
if (!CollectionUtils.isEmpty(messageElementFields)) {
messageElementFields.forEach(fieldElement -> messageDefinitionBuilder.addField(fieldElement.getType(), fieldElement.getName(), fieldElement.getTag(), fieldElement.getDefaultValue()));
}
MessageDefinition messageDefinition = messageDefinitionBuilder.build();
schemaBuilder.addMessageDefinition(messageDefinition);
});
MessageElement lastMsg = messageTypes.stream().reduce((previous, last) -> last).get();
try {
DynamicSchema dynamicSchema = schemaBuilder.build();
DynamicMessage.Builder builder = dynamicSchema.newMessageBuilder(lastMsg.getName());
return builder.getDescriptorForType();
} catch (Descriptors.DescriptorValidationException e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("Failed to get Message Descriptor! Message types is empty for " + schemaName + " schema!");
}
}
private ProtoFileElement getTransportProtoSchema(String protoSchema) {
return new ProtoParser(LOCATION, protoSchema.toCharArray()).readProtoFile();
}
}

View File

@ -0,0 +1,50 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j
public class MqttTransportConfigurationDeserializer extends StdDeserializer<MqttDeviceProfileTransportConfiguration> {
public MqttTransportConfigurationDeserializer() {
super(MqttDeviceProfileTransportConfiguration.class);
}
@Override
public MqttDeviceProfileTransportConfiguration deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
try {
JsonNode jsonNode = jsonParser.readValueAsTree();
if (jsonNode.has("deviceTelemetryProtoSchema") || jsonNode.has("deviceAttributesProtoSchema")) {
return jsonParser.getCodec().treeToValue(jsonNode, MqttProtoDeviceProfileTransportConfiguration.class);
} else {
return jsonParser.getCodec().treeToValue(jsonNode, MqttJsonDeviceProfileTransportConfiguration.class);
}
} catch (IOException e) {
log.trace("Failed to deserialize JSON content into equivalent tree model during creating {}!", MqttDeviceProfileTransportConfiguration.class.getSimpleName(), e);
throw new RuntimeException("Failed to deserialize JSON content into equivalent tree model during creating " + MqttDeviceProfileTransportConfiguration.class.getSimpleName() + "!", e);
}
}
}

View File

@ -208,24 +208,24 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
try {
MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
MqttTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor();
if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
TransportProtos.PostTelemetryMsg postTelemetryMsg = adaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
} else if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
TransportProtos.PostAttributeMsg postAttributeMsg = adaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
TransportProtos.GetAttributeRequestMsg getAttributeMsg = adaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = adaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = adaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
} else {
transportService.reportActivity(deviceSessionCtx.getSessionInfo());

View File

@ -15,7 +15,11 @@
*/
package org.thingsboard.server.transport.mqtt.adaptors;
import com.google.gson.JsonParser;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
@ -29,9 +33,11 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.adaptor.ProtoConverter;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext;
import java.util.Optional;
@ -44,19 +50,27 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
@Override
public TransportProtos.PostTelemetryMsg convertToPostTelemetry(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
try {
return ProtoConverter.convertToTelemetryProto(bytes);
} catch (InvalidProtocolBufferException | IllegalArgumentException e) {
Descriptors.Descriptor telemetryDynamicMsgDescriptor = deviceSessionCtx.getTelemetryDynamicMsgDescriptor();
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(telemetryDynamicMsgDescriptor, bytes);
String stringMsg = JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
return JsonConverter.convertToTelemetryProto(new JsonParser().parse(stringMsg));
} catch (Exception e) {
throw new AdaptorException(e);
}
}
@Override
public TransportProtos.PostAttributeMsg convertToPostAttributes(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
DeviceSessionCtx deviceSessionCtx = (DeviceSessionCtx) ctx;
byte[] bytes = toBytes(inbound.payload());
try {
return ProtoConverter.validatePostAttributeMsg(bytes);
Descriptors.Descriptor attributesDynamicMessage = deviceSessionCtx.getAttributesDynamicMessageDescriptor();
DynamicMessage dynamicMessage = DynamicMessage.parseFrom(attributesDynamicMessage, bytes);
String stringMsg = JsonFormat.printer().includingDefaultValueFields().print(dynamicMessage);
return JsonConverter.convertToAttributesProto(new JsonParser().parse(stringMsg));
} catch (InvalidProtocolBufferException | IllegalArgumentException e) {
throw new AdaptorException(e);
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.transport.mqtt.session;
import com.google.protobuf.Descriptors;
import io.netty.channel.ChannelHandlerContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -23,6 +24,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttProtoDeviceProfileTransportConfiguration;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter;
@ -49,6 +51,8 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
private volatile MqttTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
public DeviceSessionCtx(UUID sessionId, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap, MqttTransportContext context) {
super(sessionId, mqttQoSMap);
@ -63,7 +67,9 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
return msgIdSeq.incrementAndGet();
}
public boolean isDeviceTelemetryTopic(String topicName) { return telemetryTopicFilter.filter(topicName); }
public boolean isDeviceTelemetryTopic(String topicName) {
return telemetryTopicFilter.filter(topicName);
}
public boolean isDeviceAttributesTopic(String topicName) {
return attributesTopicFilter.filter(topicName);
@ -77,6 +83,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
return payloadType.equals(TransportPayloadType.JSON);
}
public Descriptors.Descriptor getTelemetryDynamicMsgDescriptor() {
return telemetryDynamicMessageDescriptor;
}
public Descriptors.Descriptor getAttributesDynamicMessageDescriptor() {
return attributesDynamicMessageDescriptor;
}
@Override
public void setDeviceProfile(DeviceProfile deviceProfile) {
super.setDeviceProfile(deviceProfile);
@ -98,10 +112,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
payloadType = mqttConfig.getTransportPayloadType();
telemetryTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceTelemetryTopic());
attributesTopicFilter = MqttTopicFilterFactory.toFilter(mqttConfig.getDeviceAttributesTopic());
if (payloadType.equals(TransportPayloadType.PROTOBUF) && mqttConfig instanceof MqttProtoDeviceProfileTransportConfiguration) {
MqttProtoDeviceProfileTransportConfiguration protoMqttConfig = (MqttProtoDeviceProfileTransportConfiguration) mqttConfig;
telemetryDynamicMessageDescriptor = protoMqttConfig.getDynamicMessageDescriptor(protoMqttConfig.getDeviceTelemetryProtoSchema(), "telemetrySchema");
attributesDynamicMessageDescriptor = protoMqttConfig.getDynamicMessageDescriptor(protoMqttConfig.getDeviceAttributesProtoSchema(), "attributesSchema");
}
} else {
telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
}
}
}

12
pom.xml
View File

@ -106,6 +106,8 @@
<commons-collections.version>3.2.2</commons-collections.version>
<java-websocket.version>1.5.0</java-websocket.version>
<micrometer.version>1.5.2</micrometer.version>
<protobuf-dynamic.version>1.0.1</protobuf-dynamic.version>
<wire-schema.version>3.4.0</wire-schema.version>
</properties>
<modules>
@ -1369,6 +1371,16 @@
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protobuf-dynamic</artifactId>
<version>${protobuf-dynamic.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
<version>${wire-schema.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -20,17 +20,6 @@
<fieldset class="fields-group">
<legend class="group-title" translate>device-profile.mqtt-device-topic-filters</legend>
<div fxLayoutGap="8px" fxLayout="column">
<mat-form-field class="mat-block">
<mat-label translate>device-profile.mqtt-device-payload-type</mat-label>
<mat-select formControlName="transportPayloadType" required>
<mat-option *ngFor="let type of mqttTransportPayloadTypes" [value]="type">
{{mqttTransportPayloadTypeTranslations.get(type) | translate}}
</mat-option>
</mat-select>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.transportPayloadType').hasError('required')">
{{ 'device-profile.mqtt-payload-type-required' | translate }}
</mat-error>
</mat-form-field>
<div fxLayout="row" fxLayoutGap="8px" fxLayout.xs="column">
<mat-form-field fxFlex>
<mat-label translate>device-profile.telemetry-topic-filter</mat-label>
@ -68,5 +57,40 @@
<div class="tb-hint" innerHTML="{{ 'device-profile.multi-level-wildcards-hint' | translate }}"></div>
</div>
</fieldset>
<fieldset class="fields-group">
<legend class="group-title" translate>device-profile.mqtt-device-payload-type</legend>
<div fxLayoutGap="8px" fxLayout="column">
<mat-form-field class="mat-block">
<mat-select formControlName="transportPayloadType" required>
<mat-option *ngFor="let type of mqttTransportPayloadTypes" [value]="type">
{{mqttTransportPayloadTypeTranslations.get(type) | translate}}
</mat-option>
</mat-select>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.transportPayloadType').hasError('required')">
{{ 'device-profile.mqtt-payload-type-required' | translate }}
</mat-error>
</mat-form-field>
<div *ngIf="protoPayloadType()" fxLayout="column">
<mat-form-field fxFlex>
<mat-label translate>device-profile.telemetry-proto-schema</mat-label>
<textarea matInput required
formControlName="deviceTelemetryProtoSchema"
rows="5"></textarea>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceTelemetryProtoSchema').hasError('required')">
{{ 'device-profile.telemetry-proto-schema-required' | translate}}
</mat-error>
</mat-form-field>
<mat-form-field fxFlex>
<mat-label translate>device-profile.attributes-proto-schema</mat-label>
<textarea matInput required
formControlName="deviceAttributesProtoSchema"
rows="5"></textarea>
<mat-error *ngIf="mqttDeviceProfileTransportConfigurationFormGroup.get('configuration.deviceAttributesProtoSchema').hasError('required')">
{{ 'device-profile.attributes-proto-schema-required' | translate}}
</mat-error>
</mat-form-field>
</div>
</div>
</fieldset>
</section>
</form>

View File

@ -28,12 +28,13 @@ import { Store } from '@ngrx/store';
import { AppState } from '@app/core/core.state';
import { coerceBooleanProperty } from '@angular/cdk/coercion';
import {
MqttTransportPayloadType,
DeviceProfileTransportConfiguration,
DeviceTransportType,
MqttDeviceProfileTransportConfiguration, mqttTransportPayloadTypeTranslationMap
MqttDeviceProfileTransportConfiguration,
MqttTransportPayloadType,
mqttTransportPayloadTypeTranslationMap
} from '@shared/models/device.models';
import { isDefinedAndNotNull } from '@core/utils';
import {isDefinedAndNotNull} from '@core/utils';
@Component({
selector: 'tb-mqtt-device-profile-transport-configuration',
@ -86,7 +87,9 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
configuration: this.fb.group({
deviceAttributesTopic: [null, [Validators.required, this.validationMQTTTopic()]],
deviceTelemetryTopic: [null, [Validators.required, this.validationMQTTTopic()]],
transportPayloadType: [MqttTransportPayloadType.JSON, Validators.required]
transportPayloadType: [MqttTransportPayloadType.JSON, Validators.required],
// deviceTelemetryProtoSchema: [null, Validators.required],
// deviceAttributesProtoSchema: [null, Validators.required]
})
});
this.mqttDeviceProfileTransportConfigurationFormGroup.valueChanges.subscribe(() => {
@ -103,6 +106,11 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
}
}
protoPayloadType(): boolean {
let configuration = this.mqttDeviceProfileTransportConfigurationFormGroup.getRawValue().configuration;
return configuration.transportPayloadType === MqttTransportPayloadType.PROTOBUF;
}
writeValue(value: MqttDeviceProfileTransportConfiguration | null): void {
if (isDefinedAndNotNull(value)) {
this.mqttDeviceProfileTransportConfigurationFormGroup.patchValue({configuration: value}, {emitEvent: false});
@ -114,6 +122,14 @@ export class MqttDeviceProfileTransportConfigurationComponent implements Control
if (this.mqttDeviceProfileTransportConfigurationFormGroup.valid) {
configuration = this.mqttDeviceProfileTransportConfigurationFormGroup.getRawValue().configuration;
configuration.type = DeviceTransportType.MQTT;
let configurationFormGroup = this.mqttDeviceProfileTransportConfigurationFormGroup.controls.configuration as FormGroup;
if (configuration.transportPayloadType === MqttTransportPayloadType.PROTOBUF) {
configurationFormGroup.addControl('deviceTelemetryProtoSchema', this.fb.control(null, Validators.required));
configurationFormGroup.addControl('deviceAttributesProtoSchema', this.fb.control(null, Validators.required));
} else {
configurationFormGroup.removeControl('deviceTelemetryProtoSchema');
configurationFormGroup.removeControl('deviceAttributesProtoSchema');
}
}
this.propagateChange(configuration);
}

View File

@ -807,6 +807,10 @@
"telemetry-topic-filter-required": "Telemetry topic filter is required.",
"attributes-topic-filter": "Attributes topic filter",
"attributes-topic-filter-required": "Attributes topic filter is required.",
"telemetry-proto-schema": "Telemetry proto schema",
"telemetry-proto-schema-required": "Telemetry proto schema is required.",
"attributes-proto-schema": "Attributes proto schema",
"attributes-proto-schema-required": "Attributes proto schema is required.",
"rpc-response-topic-filter": "RPC response topic filter",
"rpc-response-topic-filter-required": "RPC response topic filter is required.",
"not-valid-pattern-topic-filter": "Not valid pattern topic filter",