From a2d4a759ef718e257c3b44c45aec4fc6ff2ee486 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 24 Feb 2017 11:36:35 +0200 Subject: [PATCH 1/4] Implementation --- .../server/transport/mqtt/MqttTopics.java | 9 ++-- .../transport/mqtt/MqttTransportHandler.java | 2 + .../mqtt/session/GatewayDeviceSessionCtx.java | 50 ++++++++++++++++++- .../mqtt/session/GatewaySessionCtx.java | 23 +++++++-- 4 files changed, 75 insertions(+), 9 deletions(-) diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java index 4f91e1add4..0d67881f41 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java @@ -32,10 +32,11 @@ public class MqttTopics { public static final String DEVICE_ATTRIBUTES_TOPIC = BASE_DEVICE_API_TOPIC + "/attributes"; public static final String BASE_GATEWAY_API_TOPIC = "v1/gateway"; - public static final String GATEWAY_CONNECT_TOPIC = "v1/gateway/connect"; - public static final String GATEWAY_DISCONNECT_TOPIC = "v1/gateway/disconnect"; - public static final String GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes"; - public static final String GATEWAY_TELEMETRY_TOPIC = "v1/gateway/telemetry"; + public static final String GATEWAY_CONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/connect"; + public static final String GATEWAY_DISCONNECT_TOPIC = BASE_GATEWAY_API_TOPIC + "/disconnect"; + public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes"; + public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry"; + public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; private MqttTopics() { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fbc53ff843..3300d3e847 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -134,6 +134,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionCtx.onDeviceTelemetry(mqttMsg); } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { gatewaySessionCtx.onDeviceAttributes(mqttMsg); + } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { + gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { gatewaySessionCtx.onDeviceConnect(mqttMsg); } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 9c4bacfd87..4a0c861db0 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -15,25 +15,43 @@ */ package org.thingsboard.server.transport.mqtt.session; -import io.netty.handler.codec.mqtt.MqttMessage; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.handler.codec.mqtt.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; import org.thingsboard.server.common.msg.core.ResponseMsg; +import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; +import org.thingsboard.server.common.msg.kv.AttributesKVMsg; import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.msg.session.ex.SessionException; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; +import java.nio.charset.Charset; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; /** * Created by ashvayka on 19.01.17. */ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { + private static final Gson GSON = new Gson(); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + private GatewaySessionCtx parent; private final MqttSessionId sessionId; private volatile boolean closed; + private AtomicInteger msgIdSeq = new AtomicInteger(0); public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device) { super(parent.getProcessor(), parent.getAuthService(), device); @@ -70,6 +88,12 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } } break; + case ATTRIBUTES_UPDATE_NOTIFICATION: + AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); + case TO_DEVICE_RPC_REQUEST: + ToDeviceRpcRequestMsg rpcRequest = (ToDeviceRpcRequestMsg) msg; + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_RPC_TOPIC, rpcRequest)); } return Optional.empty(); } @@ -92,4 +116,28 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { public long getTimeout() { return 0; } + + private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { + JsonObject result = new JsonObject(); + result.addProperty("device", device.getName()); + result.add("data", JsonConverter.toJson(data, false)); + return createMqttPublishMsg(topic, result); + } + + private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) { + JsonObject result = new JsonObject(); + result.addProperty("device", device.getName()); + result.add("data", JsonConverter.toJson(data, true)); + return createMqttPublishMsg(topic, result); + } + + private MqttPublishMessage createMqttPublishMsg(String topic, JsonElement json) { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); + MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, msgIdSeq.incrementAndGet()); + ByteBuf payload = ALLOCATOR.buffer(); + payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); + return new MqttPublishMessage(mqttFixedHeader, header, payload); + } + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 2badd3ab79..2b17053622 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -29,14 +29,17 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; +import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; +import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; @@ -54,10 +57,6 @@ import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.val @Slf4j public class GatewaySessionCtx { - private static final Gson GSON = new Gson(); - private static final Charset UTF8 = Charset.forName("UTF-8"); - private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); - private final Device gateway; private final SessionId gatewaySessionId; private final SessionMsgProcessor processor; @@ -125,6 +124,21 @@ public class GatewaySessionCtx { } } + public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + String deviceName = checkDeviceConnected(jsonObj.get("device").getAsString()); + Integer requestId = jsonObj.get("requestId").getAsInt(); + String data = jsonObj.get("data").getAsString(); + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); int requestId = mqttMsg.variableHeader().messageId(); @@ -188,4 +202,5 @@ public class GatewaySessionCtx { protected void writeAndFlush(MqttMessage mqttMessage) { channel.writeAndFlush(mqttMessage); } + } From 162101c9b7d756535e7f7b39d1ce8e87d09e7e52 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 2 Mar 2017 17:55:47 +0200 Subject: [PATCH 2/4] TB-41: Improvements --- .../mqtt/adaptors/MqttGatewayAdaptor.java | 36 ------------------- .../mqtt/session/GatewaySessionCtx.java | 21 ++++++----- 2 files changed, 12 insertions(+), 45 deletions(-) delete mode 100644 transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java deleted file mode 100644 index 5641af593d..0000000000 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttGatewayAdaptor.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2017 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.adaptors; - -import io.netty.handler.codec.mqtt.MqttMessage; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.MsgType; -import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; - -import java.util.Optional; - -/** - * Created by ashvayka on 19.01.17. - */ -public interface MqttGatewayAdaptor { - - AdaptorToSessionActorMsg convertToActorMsg(GatewaySessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException; - - Optional convertToAdaptorMsg(GatewaySessionCtx ctx, SessionActorToAdaptorMsg msg) throws AdaptorException; - -} diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 2b17053622..e0929e332a 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,10 +26,8 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.msg.core.BasicTelemetryUploadRequest; -import org.thingsboard.server.common.msg.core.BasicUpdateAttributesRequest; -import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; -import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; +import org.thingsboard.server.common.msg.core.*; +import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; import org.thingsboard.server.common.msg.session.FromDeviceMsg; @@ -38,6 +36,7 @@ import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; +import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; @@ -49,6 +48,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static org.thingsboard.server.common.msg.session.MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST; import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; /** @@ -83,7 +83,10 @@ public class GatewaySessionCtx { newDevice.setName(deviceName); return deviceService.saveDevice(newDevice); }); - devices.put(deviceName, new GatewayDeviceSessionCtx(this, device)); + GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device); + devices.put(deviceName, ctx); + processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg()))); + processor.process(new BasicToDeviceActorSessionMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg()))); ack(msg); } From 9305cbae1974bb8508679a5440a9dac0c685f9bf Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 3 Mar 2017 16:42:36 +0200 Subject: [PATCH 3/4] TB-41: Implementation --- .../msg/core/BasicGetAttributesRequest.java | 2 +- .../server/transport/mqtt/MqttTopics.java | 2 + .../transport/mqtt/MqttTransportHandler.java | 2 + .../mqtt/session/GatewayDeviceSessionCtx.java | 43 ++++++++++++++++++ .../mqtt/session/GatewaySessionCtx.java | 45 +++++++++++++------ 5 files changed, 80 insertions(+), 14 deletions(-) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java index b5f7c67e68..3db4138f8c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java @@ -47,7 +47,7 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib @Override public Optional> getClientAttributeNames() { - return Optional.of(clientKeys); + return Optional.ofNullable(clientKeys); } @Override diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java index 0d67881f41..c273b8e905 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java @@ -37,6 +37,8 @@ public class MqttTopics { public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes"; public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry"; public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; + public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request"; + public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response"; private MqttTopics() { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 3300d3e847..76cdf1ac5d 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -134,6 +134,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionCtx.onDeviceTelemetry(mqttMsg); } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { gatewaySessionCtx.onDeviceAttributes(mqttMsg); + } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 4a0c861db0..0a023eebad 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -24,7 +24,9 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.handler.codec.mqtt.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; +import org.thingsboard.server.common.msg.core.GetAttributesResponse; import org.thingsboard.server.common.msg.core.ResponseMsg; import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; import org.thingsboard.server.common.msg.kv.AttributesKVMsg; @@ -88,6 +90,14 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } } break; + case GET_ATTRIBUTES_RESPONSE: + GetAttributesResponse response = (GetAttributesResponse) msg; + if (response.isSuccess()) { + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response)); + } else { + //TODO: push error handling to the gateway + } + break; case ATTRIBUTES_UPDATE_NOTIFICATION: AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); @@ -117,6 +127,39 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { return 0; } + private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) { + JsonObject result = new JsonObject(); + result.addProperty("id", response.getRequestId()); + result.addProperty("device", device.getName()); + if (response.getData().isPresent()) { + AttributesKVMsg msg = response.getData().get(); + if (msg.getClientAttributes() != null) { + msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v)); + } + if (msg.getSharedAttributes() != null) { + msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v)); + } + } + return createMqttPublishMsg(topic, result); + } + + private void addValueToJson(JsonObject json, String name, KvEntry entry) { + switch (entry.getDataType()) { + case BOOLEAN: + json.addProperty(name, entry.getBooleanValue().get()); + break; + case STRING: + json.addProperty(name, entry.getStrValue().get()); + break; + case DOUBLE: + json.addProperty(name, entry.getDoubleValue().get()); + break; + case LONG: + json.addProperty(name, entry.getLongValue().get()); + break; + } + } + private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { JsonObject result = new JsonObject(); result.addProperty("device", device.getName()); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index e0929e332a..aaec17c37f 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,9 +15,10 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.gson.*; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -27,28 +28,23 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.msg.core.*; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; -import java.nio.charset.Charset; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.thingsboard.server.common.msg.session.MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST; import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; /** @@ -165,6 +161,29 @@ public class GatewaySessionCtx { } } + public void onDeviceAttributesRequest(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + int requestId = jsonObj.get("id").getAsInt(); + String deviceName = jsonObj.get("device").getAsString(); + boolean clientScope = jsonObj.get("client").getAsBoolean(); + String key = jsonObj.get("key").getAsString(); + + BasicGetAttributesRequest request; + if (clientScope) { + request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null); + } else { + request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key)); + } + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + private String checkDeviceConnected(String deviceName) { if (!devices.containsKey(deviceName)) { throw new RuntimeException("Device is not connected!"); From f5c7e5325686688defae4aff16a13c9a077cea14 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 7 Mar 2017 19:19:35 +0200 Subject: [PATCH 4/4] TB-41: Implementation --- .../server/transport/mqtt/session/GatewaySessionCtx.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index aaec17c37f..879526984d 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -128,7 +128,7 @@ public class GatewaySessionCtx { if (json.isJsonObject()) { JsonObject jsonObj = json.getAsJsonObject(); String deviceName = checkDeviceConnected(jsonObj.get("device").getAsString()); - Integer requestId = jsonObj.get("requestId").getAsInt(); + Integer requestId = jsonObj.get("id").getAsInt(); String data = jsonObj.get("data").getAsString(); GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),