diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java index 4f91e1add4..0d67881f41 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java @@ -32,10 +32,11 @@ public class MqttTopics { public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes"; public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; - public static final String GATEWAY_CONNECT_TOPIC = "v1/gateway/connect"; - public static final String GATEWAY_DISCONNECT_TOPIC = "v1/gateway/disconnect"; - public static final String GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes"; - public static final String GATEWAY_TELEMETRY_TOPIC = "v1/gateway/telemetry"; + public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/connect"; + public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/disconnect"; + public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes"; + public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry"; + public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; private MqttTopics() { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fbc53ff843..3300d3e847 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -134,6 +134,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionCtx.onDeviceTelemetry(mqttMsg); } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { gatewaySessionCtx.onDeviceAttributes(mqttMsg); + } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { gatewaySessionCtx.onDeviceConnect(mqttMsg); } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 9c4bacfd87..4a0c861db0 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,25 +15,43 @@ */ package org.thingsboard.server.transport.mqtt.session; -import io.netty.handler.codec.mqtt.MqttMessage; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.handler.codec.mqtt.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; import org.thingsboard.server.common.msg.core.ResponseMsg; +import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; +import org.thingsboard.server.common.msg.kv.AttributesKVMsg; import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.msg.session.ex.SessionException; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import java.nio.charset.Charset; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. */ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { + private static final Gson GSON = new Gson(); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + private GatewaySessionCtx parent; private final MqttSessionId sessionId; private volatile boolean closed; + private AtomicInteger msgIdSeq = new AtomicInteger(0); public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { super(parent.getProcessor(), parent.getAuthService(), device); @@ -70,6 +88,12 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } } break; + case ATTRIBUTES_UPDATE_NOTIFICATION: + AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); + case TO_DEVICE_RPC_REQUEST: + ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg; + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest)); } return Optional.empty(); } @@ -92,4 +116,28 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { public long getTimeout() { return 0; } + + private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { + JsonObject result = new JsonObject(); + result.addProperty("device", device.getName()); + result.add("data", JsonConverter.toJson(data, false)); + return createMqttPublishMsg(topic, result); + } + + private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) { + JsonObject result = new JsonObject(); + result.addProperty("device", device.getName()); + result.add("data", JsonConverter.toJson(data, true)); + return createMqttPublishMsg(topic, result); + } + + private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); + ByteBuf payload = ALLOCATOR.buffer(); + payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); + return new MqttPublishMessage(mqttFixedHeader, header, payload); + } + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 2badd3ab79..2b17053622 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -29,14 +29,17 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; +import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; @@ -54,10 +57,6 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val @Slf4j public class GatewaySessionCtx { - private static final Gson GSON = new Gson(); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); - private final Device gateway; private final SessionId gatewaySessionId; private final SessionMsgProcessor processor; @@ -125,6 +124,21 @@ public class GatewaySessionCtx { } } + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + String deviceName = checkDeviceConnected(jsonObj.get("device").getAsString()); + Integer requestId = jsonObj.get("requestId").getAsInt(); + String data = jsonObj.get("data").getAsString(); + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); int requestId = mqttMsg.variableHeader().messageId(); @@ -188,4 +202,5 @@ public class GatewaySessionCtx { protected void writeAndFlush(MqttMessage mqttMessage) { channel.writeAndFlush(mqttMessage); } + }