implementation updates after review

This commit is contained in:
ShvaykaD 2021-10-27 12:57:53 +03:00
parent ac12eaecd8
commit 09b75ac2e6
6 changed files with 89 additions and 95 deletions

View File

@ -965,22 +965,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
log.trace("[{}] Received get attributes response", sessionId);
String topicBase;
String topicBase = attrReqTopicType.getAttributesRequestTopicBase();
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrReqTopicType);
switch (attrReqTopicType) {
case V2:
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX;
break;
case V2_JSON:
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX;
break;
case V2_PROTO:
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX;
break;
default:
topicBase = MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX;
break;
}
try {
adaptor.convertToPublish(deviceSessionCtx, response, topicBase).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
@ -991,22 +977,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
log.trace("[{}] Received attributes update notification to device", sessionId);
String topic;
String topic = attrSubTopicType.getAttributesSubTopic();
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(attrSubTopicType);
switch (attrSubTopicType) {
case V2:
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
break;
case V2_JSON:
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
topic = MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;
break;
default:
topic = MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
break;
}
try {
adaptor.convertToPublish(deviceSessionCtx, notification, topic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {
@ -1023,22 +995,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
String baseTopic;
String baseTopic = rpcSubTopicType.getRpcRequestTopicBase();
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(rpcSubTopicType);
switch (rpcSubTopicType) {
case V2:
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC;
break;
case V2_JSON:
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC;
break;
default:
baseTopic = MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
break;
}
try {
adaptor.convertToPublish(deviceSessionCtx, rpcRequest, baseTopic).ifPresent(payload -> {
int msgId = ((MqttPublishMessage) payload).variableHeader().packetId();
@ -1075,22 +1033,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg rpcResponse) {
log.trace("[{}] Received RPC response from server", sessionId);
String baseTopic;
String baseTopic = toServerRpcSubTopicType.getRpcResponseTopicBase();
MqttTransportAdaptor adaptor = deviceSessionCtx.getAdaptor(toServerRpcSubTopicType);
switch (toServerRpcSubTopicType) {
case V2:
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC;
break;
case V2_JSON:
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC;
break;
case V2_PROTO:
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC;
break;
default:
baseTopic = MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
break;
}
try {
adaptor.convertToPublish(deviceSessionCtx, rpcResponse, baseTopic).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
} catch (Exception e) {

View File

@ -15,6 +15,32 @@
*/
package org.thingsboard.server.transport.mqtt;
import lombok.Getter;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
public enum TopicType {
V1, V2, V2_JSON, V2_PROTO
V1(MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX, MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC),
V2(MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_TOPIC_PREFIX, MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC),
V2_JSON(MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_JSON_TOPIC_PREFIX, MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC),
V2_PROTO(MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_SHORT_PROTO_TOPIC_PREFIX, MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);
@Getter
private final String attributesRequestTopicBase;
@Getter
private final String attributesSubTopic;
@Getter
private final String rpcRequestTopicBase;
@Getter
private final String rpcResponseTopicBase;
TopicType(String attributesRequestTopicBase, String attributesSubTopic, String rpcRequestTopicBase, String rpcResponseTopicBase) {
this.attributesRequestTopicBase = attributesRequestTopicBase;
this.attributesSubTopic = attributesSubTopic;
this.rpcRequestTopicBase = rpcRequestTopicBase;
this.rpcResponseTopicBase = rpcResponseTopicBase;
}
}

View File

@ -40,7 +40,7 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
try {
return protoAdaptor.convertToPostTelemetry(ctx, inbound);
} catch (AdaptorException e) {
log.trace("failed to process post telemetry request msg: {} due to: ", inbound, e);
log.trace("[{}] failed to process post telemetry request msg: {} due to: ", ctx.getSessionId(), inbound, e);
return jsonAdaptor.convertToPostTelemetry(ctx, inbound);
}
}
@ -50,7 +50,7 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
try {
return protoAdaptor.convertToPostAttributes(ctx, inbound);
} catch (AdaptorException e) {
log.trace("failed to process post attributes request msg: {} due to: ", inbound, e);
log.trace("[{}] failed to process post attributes request msg: {} due to: ", ctx.getSessionId(), inbound, e);
return jsonAdaptor.convertToPostAttributes(ctx, inbound);
}
}
@ -60,7 +60,7 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
try {
return protoAdaptor.convertToGetAttributes(ctx, inbound, topicBase);
} catch (AdaptorException e) {
log.trace("failed to process get attributes request msg: {} due to: ", inbound, e);
log.trace("[{}] failed to process get attributes request msg: {} due to: ", ctx.getSessionId(), inbound, e);
return jsonAdaptor.convertToGetAttributes(ctx, inbound, topicBase);
}
}
@ -70,7 +70,7 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
try {
return protoAdaptor.convertToDeviceRpcResponse(ctx, mqttMsg, topicBase);
} catch (AdaptorException e) {
log.trace("failed to process to device rpc response msg: {} due to: ", mqttMsg, e);
log.trace("[{}] failed to process to device rpc response msg: {} due to: ", ctx.getSessionId(), mqttMsg, e);
return jsonAdaptor.convertToDeviceRpcResponse(ctx, mqttMsg, topicBase);
}
}
@ -80,7 +80,7 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
try {
return protoAdaptor.convertToServerRpcRequest(ctx, mqttMsg, topicBase);
} catch (AdaptorException e) {
log.trace("failed to process to server rpc request msg: {} due to: ", mqttMsg, e);
log.trace("[{}] failed to process to server rpc request msg: {} due to: ", ctx.getSessionId(), mqttMsg, e);
return jsonAdaptor.convertToServerRpcRequest(ctx, mqttMsg, topicBase);
}
}
@ -90,26 +90,62 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
try {
return protoAdaptor.convertToClaimDevice(ctx, inbound);
} catch (AdaptorException e) {
log.trace("failed to process claim device request msg: {} due to: ", inbound, e);
log.trace("[{}] failed to process claim device request msg: {} due to: ", ctx.getSessionId(), inbound, e);
return jsonAdaptor.convertToClaimDevice(ctx, inbound);
}
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! GetAttributeResponseMsg: {} TopicBase: {}", ctx.getSessionId(), responseMsg, topicBase);
return Optional.empty();
}
@Override
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
return protoAdaptor.convertToGatewayPublish(ctx, deviceName, responseMsg);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! AttributeUpdateNotificationMsg: {} Topic: {}", ctx.getSessionId(), notificationMsg, topic);
return Optional.empty();
}
@Override
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException {
return protoAdaptor.convertToGatewayPublish(ctx, deviceName, notificationMsg);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! ToDeviceRpcRequestMsg: {} TopicBase: {}", ctx.getSessionId(), rpcRequest, topicBase);
return Optional.empty();
}
@Override
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException {
return protoAdaptor.convertToGatewayPublish(ctx, deviceName, rpcRequest);
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! ToServerRpcResponseMsg: {} TopicBase: {}", ctx.getSessionId(), rpcResponse, topicBase);
return Optional.empty();
}
@Override
public TransportProtos.ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! MqttPublishMessage: {}", ctx.getSessionId(), inbound);
return null;
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException {
log.warn("[{}] invoked not implemented adaptor method! ProvisionDeviceResponseMsg: {}", ctx.getSessionId(), provisionResponse);
return Optional.empty();
}
@Override
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException {
return protoAdaptor.convertToPublish(ctx, firmwareChunk, requestId, chunk, firmwareType);

View File

@ -60,35 +60,23 @@ public interface MqttTransportAdaptor {
ClaimDeviceMsg convertToClaimDevice(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
default Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException {
return Optional.empty();
}
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, GetAttributeResponseMsg responseMsg, String topicBase) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, GetAttributeResponseMsg responseMsg) throws AdaptorException;
default Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException {
return Optional.empty();
}
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, AttributeUpdateNotificationMsg notificationMsg, String topic) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, AttributeUpdateNotificationMsg notificationMsg) throws AdaptorException;
default Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException {
return Optional.empty();
}
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest, String topicBase) throws AdaptorException;
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
default Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException {
return Optional.empty();
}
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
default ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException {
return null;
}
ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
default Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException {
return Optional.empty();
}
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ProvisionDeviceResponseMsg provisionResponse) throws AdaptorException;
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException;

View File

@ -134,18 +134,16 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
@Override
public void setDeviceProfile(DeviceProfile deviceProfile) {
super.setDeviceProfile(deviceProfile);
updateTopicFilters(deviceProfile);
updateAdaptor();
updateDeviceSessionConfiguration(deviceProfile);
}
@Override
public void onDeviceProfileUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) {
super.onDeviceProfileUpdate(sessionInfo, deviceProfile);
updateTopicFilters(deviceProfile);
updateAdaptor();
updateDeviceSessionConfiguration(deviceProfile);
}
private void updateTopicFilters(DeviceProfile deviceProfile) {
private void updateDeviceSessionConfiguration(DeviceProfile deviceProfile) {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
if (transportConfiguration.getType().equals(DeviceTransportType.MQTT) &&
transportConfiguration instanceof MqttDeviceProfileTransportConfiguration) {
@ -158,12 +156,14 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
ProtoTransportPayloadConfiguration protoTransportPayloadConfig = (ProtoTransportPayloadConfiguration) transportPayloadTypeConfiguration;
updateDynamicMessageDescriptors(protoTransportPayloadConfig);
jsonPayloadFormatCompatibilityEnabled = protoTransportPayloadConfig.isEnableCompatibilityWithJsonPayloadFormat();
useJsonPayloadFormatForDefaultDownlinkTopics = protoTransportPayloadConfig.isUseJsonPayloadFormatForDefaultDownlinkTopics();
useJsonPayloadFormatForDefaultDownlinkTopics = jsonPayloadFormatCompatibilityEnabled && protoTransportPayloadConfig.isUseJsonPayloadFormatForDefaultDownlinkTopics();
}
} else {
telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
payloadType = TransportPayloadType.JSON;
}
updateAdaptor();
}
private void updateDynamicMessageDescriptors(ProtoTransportPayloadConfiguration protoTransportPayloadConfig) {
@ -176,17 +176,17 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
public MqttTransportAdaptor getAdaptor(TopicType topicType) {
switch (topicType) {
case V2:
return getProfileAdaptor();
return getDefaultAdaptor();
case V2_JSON:
return context.getJsonMqttAdaptor();
case V2_PROTO:
return context.getProtoMqttAdaptor();
default:
return useJsonPayloadFormatForDefaultDownlinkTopics ? context.getJsonMqttAdaptor() : getProfileAdaptor();
return useJsonPayloadFormatForDefaultDownlinkTopics ? context.getJsonMqttAdaptor() : getDefaultAdaptor();
}
}
private MqttTransportAdaptor getProfileAdaptor() {
private MqttTransportAdaptor getDefaultAdaptor() {
return isJsonPayloadType() ? context.getJsonMqttAdaptor() : context.getProtoMqttAdaptor();
}

View File

@ -1099,7 +1099,7 @@
"mqtt-enable-compatibility-with-json-payload-format": "Enable compatibility with other payload formats.",
"mqtt-enable-compatibility-with-json-payload-format-hint": "When enabled, the platform will use a Protobuf payload format by default. If parsing fails, the platform will attempt to use JSON payload format. Useful for backward compatibility during firmware updates. For example, the initial release of the firmware uses Json, while the new release uses Protobuf. During the process of firmware update for the fleet of devices, it is required to support both Protobuf and JSON simultaneously. The compatibility mode introduces slight performance degradation, so it is recommended to disable this mode once all devices are updated.",
"mqtt-use-json-format-for-default-downlink-topics": "Use Json format for default downlink topics",
"mqtt-use-json-format-for-default-downlink-topics-hint": "When enabled, the platform will use Json payload format to push attributes and RPC via the following topics: <b>v1/devices/me/attributes/response/$request_id</b>, <b>v1/devices/me/attributes</b>, <b>v1/devices/me/rpc/request/$request_id</b>, <b>v1/devices/me/rpc/response/$request_id</b>. This setting does not impact attribute and rpc subscriptions sent using new (v2) topics: <b>v2/a/res/$request_id</b>, <b>v2/a</b>, <b>v2/r/req/$request_id</b>, <b>v2/r/res/$request_id</b>. <b>$request_id</b> is an integer request identifier.",
"mqtt-use-json-format-for-default-downlink-topics-hint": "When enabled, the platform will use Json payload format to push attributes and RPC via the following topics: <b>v1/devices/me/attributes/response/$request_id</b>, <b>v1/devices/me/attributes</b>, <b>v1/devices/me/rpc/request/$request_id</b>, <b>v1/devices/me/rpc/response/$request_id</b>. This setting does not impact attribute and rpc subscriptions sent using new (v2) topics: <b>v2/a/res/$request_id</b>, <b>v2/a</b>, <b>v2/r/req/$request_id</b>, <b>v2/r/res/$request_id</b>. Where <b>$request_id</b> is an integer request identifier.",
"snmp-add-mapping": "Add SNMP mapping",
"snmp-mapping-not-configured": "No mapping for OID to timeseries/telemetry configured",
"snmp-timseries-or-attribute-name": "Timeseries/attribute name for mapping",