Ability to fetch multiple device attributes using Gateway MQTT API

This commit is contained in:
Andrew Shvayka 2017-11-29 10:51:57 +02:00
parent f7b0ec98b6
commit 336f0dc079
2 changed files with 38 additions and 26 deletions

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -24,6 +25,7 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.mqtt.*; import io.netty.handler.codec.mqtt.*;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.msg.core.*; import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.kv.AttributesKVMsg; import org.thingsboard.server.common.msg.kv.AttributesKVMsg;
@ -35,6 +37,7 @@ import org.thingsboard.server.transport.mqtt.MqttTopics;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -135,40 +138,43 @@ public class GatewayDeviceSessionCtx extends DeviceAwareSessionContext {
if (responseData.isPresent()) { if (responseData.isPresent()) {
AttributesKVMsg msg = responseData.get(); AttributesKVMsg msg = responseData.get();
if (msg.getClientAttributes() != null) { if (msg.getClientAttributes() != null) {
msg.getClientAttributes().forEach(v -> addValueToJson(result, "value", v)); addValues(result, msg.getClientAttributes());
} }
if (msg.getSharedAttributes() != null) { if (msg.getSharedAttributes() != null) {
msg.getSharedAttributes().forEach(v -> addValueToJson(result, "value", v)); addValues(result, msg.getSharedAttributes());
} }
} }
return createMqttPublishMsg(topic, result); return createMqttPublishMsg(topic, result);
} }
private void addValues(JsonObject result, List<AttributeKvEntry> kvList) {
if (kvList.size() == 1) {
addValueToJson(result, "value", kvList.get(0));
} else {
JsonObject values;
if (result.has("values")) {
values = result.get("values").getAsJsonObject();
} else {
values = new JsonObject();
result.add("values", values);
}
kvList.forEach(value -> addValueToJson(values, value.getKey(), value));
}
}
private void addValueToJson(JsonObject json, String name, KvEntry entry) { private void addValueToJson(JsonObject json, String name, KvEntry entry) {
switch (entry.getDataType()) { switch (entry.getDataType()) {
case BOOLEAN: case BOOLEAN:
Optional<Boolean> booleanValue = entry.getBooleanValue(); entry.getBooleanValue().ifPresent(aBoolean -> json.addProperty(name, aBoolean));
if (booleanValue.isPresent()) {
json.addProperty(name, booleanValue.get());
}
break; break;
case STRING: case STRING:
Optional<String> stringValue = entry.getStrValue(); entry.getStrValue().ifPresent(aString -> json.addProperty(name, aString));
if (stringValue.isPresent()) {
json.addProperty(name, stringValue.get());
}
break; break;
case DOUBLE: case DOUBLE:
Optional<Double> doubleValue = entry.getDoubleValue(); entry.getDoubleValue().ifPresent(aDouble -> json.addProperty(name, aDouble));
if (doubleValue.isPresent()) {
json.addProperty(name, doubleValue.get());
}
break; break;
case LONG: case LONG:
Optional<Long> longValue = entry.getLongValue(); entry.getLongValue().ifPresent(aLong -> json.addProperty(name, aLong));
if (longValue.isPresent()) {
json.addProperty(name, longValue.get());
}
break; break;
} }
} }

View File

@ -41,10 +41,7 @@ import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import java.util.Collections; import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload; import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
@ -193,13 +190,22 @@ public class GatewaySessionCtx {
int requestId = jsonObj.get("id").getAsInt(); int requestId = jsonObj.get("id").getAsInt();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
boolean clientScope = jsonObj.get("client").getAsBoolean(); boolean clientScope = jsonObj.get("client").getAsBoolean();
String key = jsonObj.get("key").getAsString(); Set<String> keys;
if (jsonObj.has("key")) {
keys = Collections.singleton(jsonObj.get("key").getAsString());
} else {
JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
keys = new HashSet<>();
for (JsonElement keyObj : keysArray) {
keys.add(keyObj.getAsString());
}
}
BasicGetAttributesRequest request; BasicGetAttributesRequest request;
if (clientScope) { if (clientScope) {
request = new BasicGetAttributesRequest(requestId, Collections.singleton(key), null); request = new BasicGetAttributesRequest(requestId, keys, null);
} else { } else {
request = new BasicGetAttributesRequest(requestId, null, Collections.singleton(key)); request = new BasicGetAttributesRequest(requestId, null, keys);
} }
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(),
@ -251,7 +257,7 @@ public class GatewaySessionCtx {
} }
private void ack(MqttPublishMessage msg) { private void ack(MqttPublishMessage msg) {
if(msg.variableHeader().messageId() > 0) { if (msg.variableHeader().messageId() > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId())); writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
} }
} }