From cc5b18f30e51e983d6bd7292875b553f26f0a4b5 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 12 Oct 2018 11:18:41 +0300 Subject: [PATCH] MQTT Gateway API Implementation --- .../transport/adaptor/JsonConverter.java | 17 +- .../transport/mqtt/MqttTransportHandler.java | 4 +- .../mqtt/adaptors/JsonMqttAdaptor.java | 109 ++----------- .../mqtt/adaptors/MqttTransportAdaptor.java | 3 + .../mqtt/session/GatewayDeviceSessionCtx.java | 151 ++---------------- .../mqtt/session/GatewaySessionCtx.java | 7 + 6 files changed, 47 insertions(+), 244 deletions(-) diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java index e02bb4bf34..1fcd60730c 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java @@ -37,11 +37,16 @@ import org.thingsboard.server.common.msg.core.BasicRequest; import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; -import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg; -import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg; import org.thingsboard.server.common.msg.kv.AttributesKVMsg; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.*; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import java.util.ArrayList; import java.util.List; @@ -446,4 +451,10 @@ public class JsonConverter { return error; } + public static JsonElement toGatewayJson(String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) { + JsonObject result = new JsonObject(); + result.addProperty(DEVICE_PROPERTY, deviceName); + result.add("data", JsonConverter.toJson(rpcRequest, true)); + return result; + } } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 21d33354cd..1f1288c6da 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -85,7 +85,7 @@ import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; @Slf4j public class MqttTransportHandler extends ChannelInboundHandlerAdapter implements GenericFutureListener>, SessionMsgListener { - public static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; + private static final MqttQoS MAX_SUPPORTED_QOS_LVL = AT_LEAST_ONCE; private final UUID sessionId; private final MqttTransportContext context; @@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private volatile DeviceSessionCtx deviceSessionCtx; private volatile GatewaySessionCtx gatewaySessionCtx; - public MqttTransportHandler(MqttTransportContext context) { + MqttTransportHandler(MqttTransportContext context) { this.sessionId = UUID.randomUUID(); this.context = context; this.transportService = context.getTransportService(); diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 85dc813428..4bde5a35fe 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -23,27 +23,23 @@ import com.google.gson.JsonSyntaxException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.msg.core.*; -import org.thingsboard.server.common.msg.kv.AttributesKVMsg; -import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.MqttTopics; -import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; -import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext; import java.nio.charset.Charset; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -168,48 +164,16 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC + rpcRequest.getRequestId(), JsonConverter.toJson(rpcRequest, false))); } + @Override + public Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException { + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_RPC_TOPIC, JsonConverter.toGatewayJson(deviceName, rpcRequest))); + } + @Override public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, TransportProtos.ToServerRpcResponseMsg rpcResponse) { return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC + rpcResponse.getRequestId(), JsonConverter.toJson(rpcResponse))); } - private MqttMessage convertResponseMsg(MqttDeviceAwareSessionContext ctx, ToDeviceMsg msg, - ResponseMsg responseMsg, Optional responseError) throws AdaptorException { - MqttMessage result = null; - SessionMsgType requestMsgType = responseMsg.getRequestMsgType(); - Integer requestId = responseMsg.getRequestId(); - if (requestId >= 0) { - if (requestMsgType == SessionMsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == SessionMsgType.POST_TELEMETRY_REQUEST) { - result = MqttTransportHandler.createMqttPubAckMsg(requestId); - } else if (requestMsgType == SessionMsgType.GET_ATTRIBUTES_REQUEST) { - GetAttributesResponse response = (GetAttributesResponse) msg; - Optional responseData = response.getData(); - if (response.isSuccess() && responseData.isPresent()) { - result = createMqttPublishMsg(ctx, - MqttTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX + requestId, - responseData.get(), true); - } else { - if (responseError.isPresent()) { - throw new AdaptorException(responseError.get()); - } - } - } - } - return result; - } - - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, AttributesKVMsg msg, boolean asMap) { - return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap)); - } - - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, ToDeviceRpcRequestMsg msg) { - return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false)); - } - - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, TransportProtos.ToServerRpcResponseMsg msg) { - return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg)); - } - private MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, JsonElement json) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0); @@ -219,39 +183,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { return new MqttPublishMessage(mqttFixedHeader, header, payload); } - private FromDeviceMsg convertToGetAttributesRequest(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - String topicName = inbound.variableHeader().topicName(); - try { - Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())); - String payload = inbound.payload().toString(UTF8); - JsonElement requestBody = new JsonParser().parse(payload); - Set clientKeys = toStringSet(requestBody, "clientKeys"); - Set sharedKeys = toStringSet(requestBody, "sharedKeys"); - if (clientKeys == null && sharedKeys == null) { - return new BasicGetAttributesRequest(requestId); - } else { - return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys); - } - } catch (RuntimeException e) { - log.warn("Failed to decode get attributes request", e); - throw new AdaptorException(e); - } - } - - private FromDeviceMsg convertToRpcCommandResponse(MqttDeviceAwareSessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - String topicName = inbound.variableHeader().topicName(); - try { - Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length())); - String payload = inbound.payload().toString(UTF8); - return new ToDeviceRpcResponseMsg( - requestId, - payload); - } catch (RuntimeException e) { - log.warn("Failed to decode get attributes request", e); - throw new AdaptorException(e); - } - } - private Set toStringSet(JsonElement requestBody, String name) { JsonElement element = requestBody.getAsJsonObject().get(name); if (element != null) { @@ -261,24 +192,6 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } - private AttributesUpdateRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); - try { - return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().packetId()); - } catch (IllegalStateException | JsonSyntaxException ex) { - throw new AdaptorException(ex); - } - } - - private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { - String payload = validatePayload(ctx.getSessionId(), inbound.payload()); - try { - return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().packetId()); - } catch (IllegalStateException | JsonSyntaxException ex) { - throw new AdaptorException(ex); - } - } - public static JsonElement validateJsonPayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { String payload = validatePayload(sessionId, payloadData); try { @@ -288,7 +201,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { } } - public static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { + private static String validatePayload(UUID sessionId, ByteBuf payloadData) throws AdaptorException { try { String payload = payloadData.toString(UTF8); if (payload == null) { diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 37b5395b69..a653e4c93e 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -18,6 +18,7 @@ package org.thingsboard.server.transport.mqtt.adaptors; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; @@ -56,5 +57,7 @@ public interface MqttTransportAdaptor { Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; + Optional convertToGatewayPublish(MqttDeviceAwareSessionContext ctx, String deviceName, ToDeviceRpcRequestMsg rpcRequest) throws AdaptorException; + Optional convertToPublish(MqttDeviceAwareSessionContext ctx, ToServerRpcResponseMsg rpcResponse) throws AdaptorException; } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 387d9cf439..32d9e55832 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,43 +15,14 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.sun.xml.internal.bind.v2.TODO; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.handler.codec.mqtt.MqttFixedHeader; -import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageType; -import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; -import org.thingsboard.server.common.msg.core.GetAttributesResponse; -import org.thingsboard.server.common.msg.core.ResponseMsg; -import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; -import org.thingsboard.server.common.msg.kv.AttributesKVMsg; -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; -import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.common.msg.session.ToDeviceMsg; import org.thingsboard.server.common.transport.SessionMsgListener; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; -import org.thingsboard.server.transport.mqtt.MqttTopics; -import org.thingsboard.server.transport.mqtt.MqttTransportHandler; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. @@ -59,15 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener { - private static final Gson GSON = new Gson(); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); - private final GatewaySessionCtx parent; private final UUID sessionId; private final SessionInfoProto sessionInfo; - private volatile boolean closed; - private AtomicInteger msgIdSeq = new AtomicInteger(0); public GatewayDeviceSessionCtx(GatewaySessionCtx parent, DeviceInfoProto deviceInfo, ConcurrentMap mqttQoSMap) { super(mqttQoSMap); @@ -90,109 +55,9 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple return sessionId; } - private Optional getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) { - ToDeviceMsg msg = sessionMsg.getMsg(); - switch (msg.getSessionMsgType()) { - case STATUS_CODE_RESPONSE: - ResponseMsg responseMsg = (ResponseMsg) msg; - if (responseMsg.isSuccess()) { - SessionMsgType requestMsgType = responseMsg.getRequestMsgType(); - Integer requestId = responseMsg.getRequestId(); - if (requestId >= 0 && (requestMsgType == SessionMsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == SessionMsgType.POST_TELEMETRY_REQUEST)) { - return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId)); - } - } - break; - case GET_ATTRIBUTES_RESPONSE: - GetAttributesResponse response = (GetAttributesResponse) msg; - if (response.isSuccess()) { - return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response)); - } else { - //TODO: push error handling to the gateway - } - break; - case ATTRIBUTES_UPDATE_NOTIFICATION: - AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; - return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); - case TO_DEVICE_RPC_REQUEST: - ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg; - return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest)); - default: - break; - } - return Optional.empty(); - } - - private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) { - JsonObject result = new JsonObject(); - result.addProperty("id", response.getRequestId()); -// result.addProperty(DEVICE_PROPERTY, device.getName()); - Optional responseData = response.getData(); - if (responseData.isPresent()) { - AttributesKVMsg msg = responseData.get(); - if (msg.getClientAttributes() != null) { - addValues(result, msg.getClientAttributes()); - } - if (msg.getSharedAttributes() != null) { - addValues(result, msg.getSharedAttributes()); - } - } - return createMqttPublishMsg(topic, result); - } - - private void addValues(JsonObject result, List kvList) { - if (kvList.size() == 1) { - addValueToJson(result, "value", kvList.get(0)); - } else { - JsonObject values; - if (result.has("values")) { - values = result.get("values").getAsJsonObject(); - } else { - values = new JsonObject(); - result.add("values", values); - } - kvList.forEach(value -> addValueToJson(values, value.getKey(), value)); - } - } - - private void addValueToJson(JsonObject json, String name, KvEntry entry) { - switch (entry.getDataType()) { - case BOOLEAN: - entry.getBooleanValue().ifPresent(aBoolean -> json.addProperty(name, aBoolean)); - break; - case STRING: - entry.getStrValue().ifPresent(aString -> json.addProperty(name, aString)); - break; - case DOUBLE: - entry.getDoubleValue().ifPresent(aDouble -> json.addProperty(name, aDouble)); - break; - case LONG: - entry.getLongValue().ifPresent(aLong -> json.addProperty(name, aLong)); - break; - } - } - - private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { - JsonObject result = new JsonObject(); -// result.addProperty(DEVICE_PROPERTY, device.getName()); - result.add("data", JsonConverter.toJson(data, false)); - return createMqttPublishMsg(topic, result); - } - - private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) { - JsonObject result = new JsonObject(); -// result.addProperty(DEVICE_PROPERTY, device.getName()); - result.add("data", JsonConverter.toJson(data, true)); - return createMqttPublishMsg(topic, result); - } - - private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { - MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(MqttMessageType.PUBLISH, false, getQoSForTopic(topic), false, 0); - MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); - ByteBuf payload = ALLOCATOR.buffer(); - payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); - return new MqttPublishMessage(mqttFixedHeader, header, payload); + @Override + public int nextMsgId() { + return parent.nextMsgId(); } SessionInfoProto getSessionInfo() { @@ -223,12 +88,16 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple } @Override - public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg toDeviceRequest) { - + public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg request) { + try { + parent.getAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(parent::writeAndFlush); + } catch (Exception e) { + log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e); + } } @Override public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) { - TODO + // This feature is not supported in the TB IoT Gateway yet. } } diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 3788d57e54..e038a4d269 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -52,6 +52,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. @@ -70,10 +71,12 @@ public class GatewaySessionCtx { private final Map devices; private final ConcurrentMap mqttQoSMap; private final ChannelHandlerContext channel; + private final DeviceSessionCtx deviceSessionCtx; public GatewaySessionCtx(MqttTransportContext context, DeviceSessionCtx deviceSessionCtx, UUID sessionId) { this.context = context; this.transportService = context.getTransportService(); + this.deviceSessionCtx = deviceSessionCtx; this.gateway = deviceSessionCtx.getDeviceInfo(); this.sessionId = sessionId; this.devices = new ConcurrentHashMap<>(); @@ -357,4 +360,8 @@ public class GatewaySessionCtx { public MqttTransportAdaptor getAdaptor() { return context.getAdaptor(); } + + public int nextMsgId() { + return deviceSessionCtx.nextMsgId(); + } }