refactor GatewayDeviceSessionCtx and MqttAdaptors
refactor GatewayDeviceSessionCtx to determine the value of multipleAttrKeysRequested before calling the JsonMqttAdaptor, add corresponding convertToGatewayPublish method to adaptors interface with multipleAttrKeysRequested parameter, refactor other adaptors to deal with new method
This commit is contained in:
parent
443bb2280f
commit
35c30b7678
@ -128,6 +128,11 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor {
|
||||
return protoAdaptor.convertToGatewayPublish(ctx, deviceName, rpcRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException {
|
||||
return protoAdaptor.convertToGatewayPublish(ctx, deviceName, responseMsg, multipleAttributeKeysRequested);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException {
|
||||
log.warn("[{}] invoked not implemented adaptor method! ToServerRpcResponseMsg: {} TopicBase: {}", ctx.getSessionId(), rpcResponse, topicBase);
|
||||
|
||||
@ -125,13 +125,12 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
|
||||
|
||||
@Override
|
||||
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg) throws AdaptorException {
|
||||
Map<Integer, JsonObject> pendingAttributesRequests = ((GatewayDeviceSessionCtx) ctx).getPendingAttributesRequests();
|
||||
int requestId = responseMsg.getRequestId();
|
||||
JsonObject request = pendingAttributesRequests.getOrDefault(requestId, new JsonObject());
|
||||
boolean multipleAttrKeysRequested =
|
||||
request.has("keys") && request.get("keys").getAsJsonArray().size() > 1;
|
||||
pendingAttributesRequests.remove(requestId);
|
||||
return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, multipleAttrKeysRequested);
|
||||
return convertToGatewayPublish(ctx, deviceName, responseMsg, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException {
|
||||
return processConvertFromGatewayAttributeResponseMsg(ctx, deviceName, responseMsg, multipleAttributeKeysRequested);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage;
|
||||
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||
@ -72,6 +73,8 @@ public interface MqttTransportAdaptor {
|
||||
|
||||
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException;
|
||||
|
||||
Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException;
|
||||
|
||||
Optional<MqttMessage> convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse, String topicBase) throws AdaptorException;
|
||||
|
||||
ProvisionDeviceRequestMsg convertToProvisionRequestMsg(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException;
|
||||
|
||||
@ -209,6 +209,11 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor {
|
||||
return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, payloadBytes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<MqttMessage> convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.GetAttributeResponseMsg responseMsg, boolean multipleAttributeKeysRequested) throws AdaptorException {
|
||||
return convertToGatewayPublish(ctx, deviceName, responseMsg);
|
||||
}
|
||||
|
||||
public static byte[] toBytes(ByteBuf inbound) {
|
||||
byte[] bytes = new byte[inbound.readableBytes()];
|
||||
int readerIndex = inbound.readerIndex();
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.session;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,7 +27,6 @@ import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@ -82,7 +80,10 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
|
||||
@Override
|
||||
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
|
||||
try {
|
||||
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush);
|
||||
boolean multipleAttrKeysReq = isMultipleAttributeKeysRequested(response.getRequestId());
|
||||
parent.getPayloadAdaptor()
|
||||
.convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response, multipleAttrKeysReq)
|
||||
.ifPresent(parent::writeAndFlush);
|
||||
} catch (Exception e) {
|
||||
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
||||
}
|
||||
@ -138,8 +139,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
|
||||
// This feature is not supported in the TB IoT Gateway yet.
|
||||
}
|
||||
|
||||
public Map<Integer, JsonObject> getPendingAttributesRequests() {
|
||||
return parent.getPendingAttributesRequests();
|
||||
public boolean isMultipleAttributeKeysRequested(int requestId) {
|
||||
return parent.isMultipleAttributeKeysRequested(requestId);
|
||||
}
|
||||
|
||||
private boolean isAckExpected(MqttMessage message) {
|
||||
|
||||
@ -108,8 +108,11 @@ public class GatewaySessionHandler {
|
||||
return new ConcurrentReferenceHashMap<>(16, ReferenceType.WEAK);
|
||||
}
|
||||
|
||||
public Map<Integer, JsonObject> getPendingAttributesRequests () {
|
||||
return pendingAttributesRequests;
|
||||
public boolean isMultipleAttributeKeysRequested(int requestId) {
|
||||
JsonObject request = pendingAttributesRequests.getOrDefault(requestId, new JsonObject());
|
||||
boolean multipleAttributeKeysRequested = request.has("keys") && request.get("keys").getAsJsonArray().size() > 1;
|
||||
pendingAttributesRequests.remove(requestId);
|
||||
return multipleAttributeKeysRequested;
|
||||
}
|
||||
|
||||
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user