From 9305cbae1974bb8508679a5440a9dac0c685f9bf Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Fri, 3 Mar 2017 16:42:36 +0200 Subject: [PATCH] TB-41: Implementation --- .../msg/core/BasicGetAttributesRequest.java | 2 +- .../server/transport/mqtt/MqttTopics.java | 2 + .../transport/mqtt/MqttTransportHandler.java | 2 + .../mqtt/session/GatewayDeviceSessionCtx.java | 43 ++++++++++++++++++ .../mqtt/session/GatewaySessionCtx.java | 45 +++++++++++++------ 5 files changed, 80 insertions(+), 14 deletions(-) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java index b5f7c67e68..3db4138f8c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java @@ -47,7 +47,7 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib @Override public Optional> getClientAttributeNames() { - return Optional.of(clientKeys); + return Optional.ofNullable(clientKeys); } @Override diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java index 0d67881f41..c273b8e905 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopics.java @@ -37,6 +37,8 @@ public class MqttTopics { public static final String GATEWAY_ATTRIBUTES_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes"; public static final String GATEWAY_TELEMETRY_TOPIC = BASE_GATEWAY_API_TOPIC + "/telemetry"; public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + "/rpc"; + public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/request"; + public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + "/attributes/response"; private MqttTopics() { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 3300d3e847..76cdf1ac5d 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -134,6 +134,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionCtx.onDeviceTelemetry(mqttMsg); } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { gatewaySessionCtx.onDeviceAttributes(mqttMsg); + } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { + gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index 4a0c861db0..0a023eebad 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -24,7 +24,9 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.handler.codec.mqtt.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.msg.core.AttributesUpdateNotification; +import org.thingsboard.server.common.msg.core.GetAttributesResponse; import org.thingsboard.server.common.msg.core.ResponseMsg; import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; import org.thingsboard.server.common.msg.kv.AttributesKVMsg; @@ -88,6 +90,14 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { } } break; + case GET_ATTRIBUTES_RESPONSE: + GetAttributesResponse response = (GetAttributesResponse) msg; + if (response.isSuccess()) { + return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, response)); + } else { + //TODO: push error handling to the gateway + } + break; case ATTRIBUTES_UPDATE_NOTIFICATION: AttributesUpdateNotification notification = (AttributesUpdateNotification) msg; return Optional.of(createMqttPublishMsg(MqttTopics.GATEWAY_ATTRIBUTES_TOPIC, notification.getData())); @@ -117,6 +127,39 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext { return 0; } + private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) { + JsonObject result = new JsonObject(); + result.addProperty("id", response.getRequestId()); + result.addProperty("device", device.getName()); + if (response.getData().isPresent()) { + AttributesKVMsg msg = response.getData().get(); + if (msg.getClientAttributes() != null) { + msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v)); + } + if (msg.getSharedAttributes() != null) { + msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v)); + } + } + return createMqttPublishMsg(topic, result); + } + + private void addValueToJson(JsonObject json, String name, KvEntry entry) { + switch (entry.getDataType()) { + case BOOLEAN: + json.addProperty(name, entry.getBooleanValue().get()); + break; + case STRING: + json.addProperty(name, entry.getStrValue().get()); + break; + case DOUBLE: + json.addProperty(name, entry.getDoubleValue().get()); + break; + case LONG: + json.addProperty(name, entry.getLongValue().get()); + break; + } + } + private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) { JsonObject result = new JsonObject(); result.addProperty("device", device.getName()); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index e0929e332a..aaec17c37f 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,9 +15,10 @@ */ package org.thingsboard.server.transport.mqtt.session; -import com.google.gson.*; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; @@ -27,28 +28,23 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.msg.core.*; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; import org.thingsboard.server.common.msg.session.BasicToDeviceActorSessionMsg; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; -import java.nio.charset.Charset; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.thingsboard.server.common.msg.session.MsgType.SUBSCRIBE_ATTRIBUTES_REQUEST; import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; /** @@ -165,6 +161,29 @@ public class GatewaySessionCtx { } } + public void onDeviceAttributesRequest(MqttPublishMessage mqttMsg) throws AdaptorException { + JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); + if (json.isJsonObject()) { + JsonObject jsonObj = json.getAsJsonObject(); + int requestId = jsonObj.get("id").getAsInt(); + String deviceName = jsonObj.get("device").getAsString(); + boolean clientScope = jsonObj.get("client").getAsBoolean(); + String key = jsonObj.get("key").getAsString(); + + BasicGetAttributesRequest request; + if (clientScope) { + request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null); + } else { + request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key)); + } + GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); + processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), + new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); + } else { + throw new JsonSyntaxException("Can't parse value: " + json); + } + } + private String checkDeviceConnected(String deviceName) { if (!devices.containsKey(deviceName)) { throw new RuntimeException("Device is not connected!");