diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 288914822d..f150574e66 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -76,10 +76,10 @@ mqtt: adaptor: "${MQTT_ADAPTOR_NAME:JsonMqttAdaptor}" timeout: "${MQTT_TIMEOUT:10000}" # Uncomment the following lines to enable ssl for MQTT - ssl: - key_store: keystore/mqttserver.jks - key_store_password: password - key_store_type: JKS +# ssl: +# key_store: keystore/mqttserver.jks +# key_store_password: password +# key_store_type: JKS # CoAP server parameters coap: diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java index 4a392253ab..0b138dfaac 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/SessionContext.java @@ -27,8 +27,6 @@ public interface SessionContext extends SessionAwareMsg { void onMsg(SessionCtrlMsg msg) throws SessionException; - void onError(SessionException e); - boolean isClosed(); long getTimeout(); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java index 6d957ceef6..c7baaafc70 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/session/ctrl/SessionCloseMsg.java @@ -24,6 +24,10 @@ public class SessionCloseMsg implements SessionCtrlMsg { private final boolean revoked; private final boolean timeout; + public static SessionCloseMsg onDisconnect(SessionId sessionId) { + return new SessionCloseMsg(sessionId, false, false); + } + public static SessionCloseMsg onError(SessionId sessionId) { return new SessionCloseMsg(sessionId, false, false); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java index 93e764ef47..640cce798b 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java @@ -74,7 +74,7 @@ public class JsonConverter { } } - private static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) { + public static void parseWithTs(BasicTelemetryUploadRequest request, JsonObject jo) { long ts = jo.get("ts").getAsLong(); JsonObject valuesObject = jo.get("values").getAsJsonObject(); for (KvEntry entry : parseValues(valuesObject)) { @@ -82,7 +82,7 @@ public class JsonConverter { } } - private static List parseValues(JsonObject valuesObject) { + public static List parseValues(JsonObject valuesObject) { List result = new ArrayList<>(); for (Entry valueEntry : valuesObject.entrySet()) { JsonElement element = valueEntry.getValue(); diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java index e9b8e22404..cecc42d31a 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/session/CoapSessionCtx.java @@ -95,17 +95,6 @@ public class CoapSessionCtx extends DeviceAwareSessionContext { } } - @Override - public void onError(SessionException e) { - if (e instanceof SessionAuthException) { - log.warn("[{}] onError: {}", sessionId, e.getMessage()); - exchange.respond(ResponseCode.UNAUTHORIZED); - } else { - log.warn("[{}] onError: {}", sessionId, e.getMessage(), e); - exchange.respond(ResponseCode.BAD_REQUEST); - } - } - @Override public SessionId getSessionId() { return sessionId; diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java index efaa0cdd2b..4bee59533d 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/session/HttpSessionCtx.java @@ -140,11 +140,6 @@ public class HttpSessionCtx extends DeviceAwareSessionContext { } - @Override - public void onError(SessionException e) { - - } - @Override public boolean isClosed() { return false; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e74495572f..fbc53ff843 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -16,7 +16,6 @@ package org.thingsboard.server.transport.mqtt; import com.fasterxml.jackson.databind.JsonNode; -import com.google.gson.JsonElement; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.*; @@ -38,7 +37,6 @@ import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.EncryptionUtil; import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; @@ -129,13 +127,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.trace("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) { - AdaptorToSessionActorMsg msg = null; if (gatewaySessionCtx != null) { + gatewaySessionCtx.setChannel(ctx); try { - if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { - gatewaySessionCtx.connect(getDeviceName(mqttMsg)); + if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) { + gatewaySessionCtx.onDeviceTelemetry(mqttMsg); + } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { + gatewaySessionCtx.onDeviceAttributes(mqttMsg); + } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { + gatewaySessionCtx.onDeviceConnect(mqttMsg); } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { - gatewaySessionCtx.disconnect(getDeviceName(mqttMsg)); + gatewaySessionCtx.onDeviceDisconnect(mqttMsg); } } catch (RuntimeException | AdaptorException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); @@ -146,11 +148,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException { - JsonElement json = JsonMqttAdaptor.validateJsonPayload(deviceSessionCtx.getSessionId(), mqttMsg.payload()); - return json.getAsJsonObject().get("device").getAsString(); - } - private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { AdaptorToSessionActorMsg msg = null; try { @@ -309,6 +306,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processDisconnect(ChannelHandlerContext ctx) { ctx.close(); + processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); + if (gatewaySessionCtx != null) { + gatewaySessionCtx.onGatewayDisconnect(); + } } private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode) { @@ -362,9 +363,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void checkGatewaySession() { Device device = deviceSessionCtx.getDevice(); - JsonNode gatewayNode = device.getAdditionalInfo().get("gateway"); - if (gatewayNode != null && gatewayNode.asBoolean()) { - gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, device); + JsonNode infoNode = device.getAdditionalInfo(); + if (infoNode != null) { + JsonNode gatewayNode = infoNode.get("gateway"); + if (gatewayNode != null && gatewayNode.asBoolean()) { + gatewaySessionCtx = new GatewaySessionCtx(processor, deviceService, authService, deviceSessionCtx); + } } } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index ae49cd589a..bf033dcae3 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -248,7 +248,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { try { String payload = payloadData.toString(UTF8); if (payload == null) { - log.warn("[{}] Payload is empty!", sessionId); + log.warn("[{}] Payload is empty!", sessionId.toUidStr()); throw new AdaptorException(new IllegalArgumentException("Payload is empty!")); } return payload; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java deleted file mode 100644 index 02f29be016..0000000000 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttGatewayAdaptor.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright © 2016-2017 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.adaptors; - -import com.google.gson.Gson; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.handler.codec.mqtt.MqttMessage; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.MsgType; -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; - -import java.nio.charset.Charset; -import java.util.Optional; - -/** - * Created by ashvayka on 19.01.17. - */ -public class JsonMqttGatewayAdaptor implements MqttGatewayAdaptor { - - private static final Gson GSON = new Gson(); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); - - @Override - public AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException { - return null; - } - - @Override - public Optional convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException { - return null; - } -} diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index f7996facc1..f458b86248 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -82,11 +82,6 @@ public class DeviceSessionCtx extends DeviceAwareSessionContext { } } - @Override - public void onError(SessionException e) { - - } - @Override public boolean isClosed() { return false; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index fef155f62b..9c4bacfd87 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,26 +15,29 @@ */ package org.thingsboard.server.transport.mqtt.session; +import io.netty.handler.codec.mqtt.MqttMessage; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; -import org.thingsboard.server.common.msg.session.SessionCtrlMsg; -import org.thingsboard.server.common.msg.session.SessionType; +import org.thingsboard.server.common.msg.core.ResponseMsg; +import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.msg.session.ex.SessionException; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTransportHandler; + +import java.util.Optional; /** * Created by ashvayka on 19.01.17. */ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { + private GatewaySessionCtx parent; private final MqttSessionId sessionId; private volatile boolean closed; - public GatewayDeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, Device device) { - super(processor, authService, device); + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { + super(parent.getProcessor(), parent.getAuthService(), device); + this.parent = parent; this.sessionId = new MqttSessionId(); } @@ -49,8 +52,26 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } @Override - public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException { + public void onMsg(SessionActorToAdaptorMsg sessionMsg) throws SessionException { + Optional message = getToDeviceMsg(sessionMsg); + message.ifPresent(parent::writeAndFlush); + } + private Optional getToDeviceMsg(SessionActorToAdaptorMsg sessionMsg) { + ToDeviceMsg msg = sessionMsg.getMsg(); + switch (msg.getMsgType()) { + case STATUS_CODE_RESPONSE: + ResponseMsg responseMsg = (ResponseMsg) msg; + if (responseMsg.isSuccess()) { + MsgType requestMsgType = responseMsg.getRequestMsgType(); + Integer requestId = responseMsg.getRequestId(); + if (requestMsgType == MsgType.POST_ATTRIBUTES_REQUEST || requestMsgType == MsgType.POST_TELEMETRY_REQUEST) { + return Optional.of(MqttTransportHandler.createMqttPubAckMsg(requestId)); + } + } + break; + } + return Optional.empty(); } @Override @@ -58,11 +79,6 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } - @Override - public void onError(SessionException e) { - - } - @Override public boolean isClosed() { return closed; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 54336d9982..2badd3ab79 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -15,55 +15,177 @@ */ package org.thingsboard.server.transport.mqtt.session; +import com.google.gson.*; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; +import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest; +import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; +import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; +import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; +import org.thingsboard.server.common.transport.SessionMsgProcessor; +import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; + +import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; -import org.springframework.util.StringUtils; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.dao.device.DeviceService; +import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; /** * Created by ashvayka on 19.01.17. */ +@Slf4j public class GatewaySessionCtx { + private static final Gson GSON = new Gson(); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + private final Device gateway; + private final SessionId gatewaySessionId; private final SessionMsgProcessor processor; private final DeviceService deviceService; private final DeviceAuthService authService; private final Map devices; + private ChannelHandlerContext channel; - public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, Device gateway) { + public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, DeviceSessionCtx gatewaySessionCtx) { this.processor = processor; this.deviceService = deviceService; this.authService = authService; - this.gateway = gateway; + this.gateway = gatewaySessionCtx.getDevice(); + this.gatewaySessionId = gatewaySessionCtx.getSessionId(); this.devices = new HashMap<>(); } - public void connect(String deviceName) { - checkDeviceName(deviceName); + public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { + String deviceName = checkDeviceName(getDeviceName(msg)); Optional deviceOpt = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName); Device device = deviceOpt.orElseGet(() -> { Device newDevice = new Device(); newDevice.setTenantId(gateway.getTenantId()); + newDevice.setName(deviceName); return deviceService.saveDevice(newDevice); }); - devices.put(deviceName, new GatewayDeviceSessionCtx(processor, authService, device)); + devices.put(deviceName, new GatewayDeviceSessionCtx(this, device)); + ack(msg); } - public void disconnect(String deviceName) { - checkDeviceName(deviceName); - devices.remove(deviceName); + public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { + String deviceName = checkDeviceName(getDeviceName(msg)); + GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); + deviceSessionCtx.setClosed(true); + ack(msg); } - private void checkDeviceName(String deviceName) { - if (StringUtils.isEmpty(deviceName)) { - throw new RuntimeException(); + public void onGatewayDisconnect() { + devices.forEach((k, v) -> { + processor.process(SessionCloseMsg.onDisconnect(v.getSessionId())); + }); + } + + public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + int requestId = mqttMsg.variableHeader().messageId(); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + for (Map.Entry deviceEntry : jsonObj.entrySet()) { + String deviceName = checkDeviceConnected(deviceEntry.getKey()); + if (!deviceEntry.getValue().isJsonArray()) { + throw new JsonSyntaxException("Can't parse value: " + json); + } + BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId); + JsonArray deviceData = deviceEntry.getValue().getAsJsonArray(); + for (JsonElement element : deviceData) { + JsonConverter.parseWithTs(request, element.getAsJsonObject()); + } + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); + } + } else { + throw new JsonSyntaxException("Can't parse value: " + json); } } + public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + int requestId = mqttMsg.variableHeader().messageId(); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + for (Map.Entry deviceEntry : jsonObj.entrySet()) { + String deviceName = checkDeviceConnected(deviceEntry.getKey()); + if (!deviceEntry.getValue().isJsonObject()) { + throw new JsonSyntaxException("Can't parse value: " + json); + } + long ts = System.currentTimeMillis(); + BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId); + JsonObject deviceData = deviceEntry.getValue().getAsJsonObject(); + request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); + } + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + + private String checkDeviceConnected(String deviceName) { + if (!devices.containsKey(deviceName)) { + throw new RuntimeException("Device is not connected!"); + } else { + return deviceName; + } + } + + private String checkDeviceName(String deviceName) { + if (StringUtils.isEmpty(deviceName)) { + throw new RuntimeException("Device name is empty!"); + } else { + return deviceName; + } + } + + private String getDeviceName(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + return json.getAsJsonObject().get("device").getAsString(); + } + + protected SessionMsgProcessor getProcessor() { + return processor; + } + + protected DeviceAuthService getAuthService() { + return authService; + } + + public void setChannel(ChannelHandlerContext channel) { + this.channel = channel; + } + + private void ack(MqttPublishMessage msg) { + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); + } + + protected void writeAndFlush(MqttMessage mqttMessage) { + channel.writeAndFlush(mqttMessage); + } }