diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java index a608d6c52d..0cc65da1b0 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java @@ -128,6 +128,11 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor { return protoAdaptor.convertToGatewayPublish(ctx, deviceName, rpcRequest); } + @Override + public Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException { + return protoAdaptor.convertToGatewayPublish(ctx, deviceName, responseMsg, multipleAttributeKeysRequested); + } + @Override public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException { log.warn("[{}] invoked not implemented adaptor method! ToServerRpcResponseMsg: {} TopicBase: {}", ctx.getSessionId(), rpcResponse, topicBase); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 7aea8f3f91..d71b87efdc 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -125,13 +125,12 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { @Override public Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException { - Map pendingAttributesRequests = ((GatewayDeviceSessionCtx) ctx).getPendingAttributesRequests(); - int requestId = responseMsg.getRequestId(); - JsonObject request = pendingAttributesRequests.getOrDefault(requestId, new JsonObject()); - boolean multipleAttrKeysRequested = - request.has("keys") && request.get("keys").getAsJsonArray().size() > 1; - pendingAttributesRequests.remove(requestId); - return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, multipleAttrKeysRequested); + return convertToGatewayPublish(ctx, deviceName, responseMsg, false); + } + + @Override + public Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException { + return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, multipleAttributeKeysRequested); } @Override diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 53ef439eda..a980e91b53 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -72,6 +73,8 @@ public interface MqttTransportAdaptor { Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; + Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException; ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java index 4fa47b367d..4dc9a95d6b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java @@ -209,6 +209,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, payloadBytes)); } + @Override + public Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException { + return convertToGatewayPublish(ctx, deviceName, responseMsg); + } + public static byte[] toBytes(ByteBuf inbound) { byte[] bytes = new byte[inbound.readableBytes()]; int readerIndex = inbound.readerIndex(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 593205b745..02324fd3e7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.gson.JsonObject; import io.netty.channel.ChannelFuture; import io.netty.handler.codec.mqtt.MqttMessage; import lombok.extern.slf4j.Slf4j; @@ -28,7 +27,6 @@ import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -82,7 +80,10 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple @Override public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) { try { - parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush); + boolean multipleAttrKeysReq = isMultipleAttributeKeysRequested(response.getRequestId()); + parent.getPayloadAdaptor() + .convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response, multipleAttrKeysReq) + .ifPresent(parent::writeAndFlush); } catch (Exception e) { log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); } @@ -138,8 +139,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple // This feature is not supported in the TB IoT Gateway yet. } - public Map getPendingAttributesRequests() { - return parent.getPendingAttributesRequests(); + public boolean isMultipleAttributeKeysRequested(int requestId) { + return parent.isMultipleAttributeKeysRequested(requestId); } private boolean isAckExpected(MqttMessage message) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 4b4e986d5f..12e9f786bf 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -108,8 +108,11 @@ public class GatewaySessionHandler { return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK); } - public Map getPendingAttributesRequests () { - return pendingAttributesRequests; + public boolean isMultipleAttributeKeysRequested(int requestId) { + JsonObject request = pendingAttributesRequests.getOrDefault(requestId, new JsonObject()); + boolean multipleAttributeKeysRequested = request.has("keys") && request.get("keys").getAsJsonArray().size() > 1; + pendingAttributesRequests.remove(requestId); + return multipleAttributeKeysRequested; } public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {