diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java index b5f7c67e68..3db4138f8c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java @@ -47,7 +47,7 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib @Override public Optional> getClientAttributeNames() { - return Optional.of(clientKeys); + return Optional.ofNullable(clientKeys); } @Override diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java index 4f91e1add4..c273b8e905 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java @@ -32,10 +32,13 @@ public class MqttTopics { public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes"; public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; - public static final String GATEWAY_CONNECT_TOPIC = "v1/gateway/connect"; - public static final String GATEWAY_DISCONNECT_TOPIC = "v1/gateway/disconnect"; - public static final String GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes"; - public static final String GATEWAY_TELEMETRY_TOPIC = "v1/gateway/telemetry"; + public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/connect"; + public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/disconnect"; + public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes"; + public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry"; + public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; + public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request"; + public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response"; private MqttTopics() { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fbc53ff843..76cdf1ac5d 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -134,6 +134,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionCtx.onDeviceTelemetry(mqttMsg); } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { gatewaySessionCtx.onDeviceAttributes(mqttMsg); + } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); + } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { gatewaySessionCtx.onDeviceConnect(mqttMsg); } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java deleted file mode 100644 index 5641af593d..0000000000 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2017 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.adaptors; - -import io.netty.handler.codec.mqtt.MqttMessage; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.MsgType; -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; - -import java.util.Optional; - -/** - * Created by ashvayka on 19.01.17. - */ -public interface MqttGatewayAdaptor { - - AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException; - - Optional convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException; - -} diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 9c4bacfd87..0a023eebad 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,25 +15,45 @@ */ package org.thingsboard.server.transport.mqtt.session; -import io.netty.handler.codec.mqtt.MqttMessage; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.handler.codec.mqtt.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; +import org.thingsboard.server.common.msg.core.GetAttributesResponse; import org.thingsboard.server.common.msg.core.ResponseMsg; +import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; +import org.thingsboard.server.common.msg.kv.AttributesKVMsg; import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.msg.session.ex.SessionException; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import java.nio.charset.Charset; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. */ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { + private static final Gson GSON = new Gson(); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + private GatewaySessionCtx parent; private final MqttSessionId sessionId; private volatile boolean closed; + private AtomicInteger msgIdSeq = new AtomicInteger(0); public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { super(parent.getProcessor(), parent.getAuthService(), device); @@ -70,6 +90,20 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } } break; + case GET_ATTRIBUTES_RESPONSE: + GetAttributesResponse response = (GetAttributesResponse) msg; + if (response.isSuccess()) { + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response)); + } else { + //TODO: push error handling to the gateway + } + break; + case ATTRIBUTES_UPDATE_NOTIFICATION: + AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); + case TO_DEVICE_RPC_REQUEST: + ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg; + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest)); } return Optional.empty(); } @@ -92,4 +126,61 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { public long getTimeout() { return 0; } + + private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) { + JsonObject result = new JsonObject(); + result.addProperty("id", response.getRequestId()); + result.addProperty("device", device.getName()); + if (response.getData().isPresent()) { + AttributesKVMsg msg = response.getData().get(); + if (msg.getClientAttributes() != null) { + msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v)); + } + if (msg.getSharedAttributes() != null) { + msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v)); + } + } + return createMqttPublishMsg(topic, result); + } + + private void addValueToJson(JsonObject json, String name, KvEntry entry) { + switch (entry.getDataType()) { + case BOOLEAN: + json.addProperty(name, entry.getBooleanValue().get()); + break; + case STRING: + json.addProperty(name, entry.getStrValue().get()); + break; + case DOUBLE: + json.addProperty(name, entry.getDoubleValue().get()); + break; + case LONG: + json.addProperty(name, entry.getLongValue().get()); + break; + } + } + + private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { + JsonObject result = new JsonObject(); + result.addProperty("device", device.getName()); + result.add("data", JsonConverter.toJson(data, false)); + return createMqttPublishMsg(topic, result); + } + + private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) { + JsonObject result = new JsonObject(); + result.addProperty("device", device.getName()); + result.add("data", JsonConverter.toJson(data, true)); + return createMqttPublishMsg(topic, result); + } + + private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); + ByteBuf payload = ALLOCATOR.buffer(); + payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); + return new MqttPublishMessage(mqttFixedHeader, header, payload); + } + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 854ad8f1e9..8cb7aa8d54 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -15,9 +15,10 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.gson.*; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -26,9 +27,7 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; -import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest; -import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; +import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; @@ -40,7 +39,7 @@ import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; -import java.nio.charset.Charset; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -54,10 +53,6 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val @Slf4j public class GatewaySessionCtx { - private static final Gson GSON = new Gson(); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); - private final Device gateway; private final SessionId gatewaySessionId; private final SessionMsgProcessor processor; @@ -84,7 +79,10 @@ public class GatewaySessionCtx { newDevice.setName(deviceName); return deviceService.saveDevice(newDevice); }); - devices.put(deviceName, new GatewayDeviceSessionCtx(this, device)); + GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device); + devices.put(deviceName, ctx); + processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); + processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg()))); ack(msg); } @@ -127,6 +125,21 @@ public class GatewaySessionCtx { } } + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + String deviceName = checkDeviceConnected(jsonObj.get("device").getAsString()); + Integer requestId = jsonObj.get("id").getAsInt(); + String data = jsonObj.get("data").getAsString(); + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); int requestId = mqttMsg.variableHeader().messageId(); @@ -150,6 +163,29 @@ public class GatewaySessionCtx { } } + public void onDeviceAttributesRequest(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + int requestId = jsonObj.get("id").getAsInt(); + String deviceName = jsonObj.get("device").getAsString(); + boolean clientScope = jsonObj.get("client").getAsBoolean(); + String key = jsonObj.get("key").getAsString(); + + BasicGetAttributesRequest request; + if (clientScope) { + request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null); + } else { + request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key)); + } + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + private String checkDeviceConnected(String deviceName) { if (!devices.containsKey(deviceName)) { throw new RuntimeException("Device is not connected!"); @@ -190,4 +226,5 @@ public class GatewaySessionCtx { protected void writeAndFlush(MqttMessage mqttMessage) { channel.writeAndFlush(mqttMessage); } + }