diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 34ae9b2110..a446680bee 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -29,6 +29,8 @@ public class DataConstants { public static final String SERVER_SCOPE = "SERVER_SCOPE"; public static final String SHARED_SCOPE = "SHARED_SCOPE"; + public static final String[] ALL_SCOPES = {CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE}; + public static final String ALARM = "ALARM"; public static final String ERROR = "ERROR"; public static final String LC_EVENT = "LC_EVENT"; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java index a8fdc86454..676cfb795e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/BasicGetAttributesRequest.java @@ -18,6 +18,8 @@ package org.thingsboard.server.common.msg.core; import lombok.ToString; import org.thingsboard.server.common.msg.session.MsgType; +import java.util.Collections; +import java.util.Optional; import java.util.Set; @ToString @@ -28,6 +30,10 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib private final Set clientKeys; private final Set sharedKeys; + public BasicGetAttributesRequest(Integer requestId) { + this(requestId, Collections.emptySet(), Collections.emptySet()); + } + public BasicGetAttributesRequest(Integer requestId, Set clientKeys, Set sharedKeys) { super(requestId); this.clientKeys = clientKeys; @@ -40,13 +46,13 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib } @Override - public Set getClientAttributeNames() { - return clientKeys; + public Optional> getClientAttributeNames() { + return Optional.of(clientKeys); } @Override - public Set getSharedAttributeNames() { - return sharedKeys; + public Optional> getSharedAttributeNames() { + return Optional.ofNullable(sharedKeys); } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java index 49bca53951..0a9e1c2a9e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/core/GetAttributesRequest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.msg.core; +import java.util.Optional; import java.util.Set; import org.thingsboard.server.common.msg.session.FromDeviceMsg; @@ -22,7 +23,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; public interface GetAttributesRequest extends FromDeviceRequestMsg { - Set getClientAttributeNames(); - Set getSharedAttributeNames(); + Optional> getClientAttributeNames(); + Optional> getSharedAttributeNames(); } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java index 190d9ffa0c..07c9629f2d 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java @@ -175,6 +175,23 @@ public class SubscriptionManager { } } + public void onAttributesUpdateFromServer(PluginContext ctx, DeviceId deviceId, String scope, List attributes) { + Optional serverAddress = ctx.resolve(deviceId); + if (!serverAddress.isPresent()) { + onLocalSubscriptionUpdate(ctx, deviceId, SubscriptionType.ATTRIBUTES, s -> { + List subscriptionUpdate = new ArrayList(); + for (AttributeKvEntry kv : attributes) { + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { + subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv)); + } + } + return subscriptionUpdate; + }); + } else { + rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), deviceId, scope, attributes); + } + } + private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) { log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update); update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue())); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java index 63d145ded9..8668639ea5 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java @@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin(); + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s))); } List keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList()); msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK)); @@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { if (!StringUtils.isEmpty(scope)) { attributes = getAttributeKvEntries(ctx, scope, deviceId, keys); } else { - attributes = getAttributeKvEntries(ctx, DataConstants.CLIENT_SCOPE, deviceId, keys); - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SHARED_SCOPE, deviceId, keys)); - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SERVER_SCOPE, deviceId, keys)); + attributes = new ArrayList<>(); + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys))); } List values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); @@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @Override public void onSuccess(PluginContext ctx, Void value) { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); + subscriptionManager.onAttributesUpdateFromServer(ctx, deviceId, scope, attributes); } @Override @@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { DeviceId deviceId = DeviceId.fromString(pathParams[0]); String scope = pathParams[1]; if (DataConstants.SERVER_SCOPE.equals(scope) || - DataConstants.SHARED_SCOPE.equals(scope)) { + DataConstants.SHARED_SCOPE.equals(scope) || + DataConstants.CLIENT_SCOPE.equals(scope)) { String keysParam = request.getParameter("keys"); if (!StringUtils.isEmpty(keysParam)) { String[] keys = keysParam.split(","); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java index 06467fe1a1..e59fa64330 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java @@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.extensions.api.plugins.PluginContext; import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler; @@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { private final SubscriptionManager subscriptionManager; private static final int SUBSCRIPTION_CLAZZ = 1; - private static final int SUBSCRIPTION_UPDATE_CLAZZ = 2; - private static final int SESSION_CLOSE_CLAZZ = 3; - private static final int SUBSCRIPTION_CLOSE_CLAZZ = 4; + private static final int ATTRIBUTES_UPDATE_CLAZZ = 2; + private static final int SUBSCRIPTION_UPDATE_CLAZZ = 3; + private static final int SESSION_CLOSE_CLAZZ = 4; + private static final int SUBSCRIPTION_CLOSE_CLAZZ = 5; @Override public void process(PluginContext ctx, RpcMsg msg) { @@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { case SUBSCRIPTION_UPDATE_CLAZZ: processRemoteSubscriptionUpdate(ctx, msg); break; + case ATTRIBUTES_UPDATE_CLAZZ: + processAttributeUpdate(ctx, msg); + break; case SESSION_CLOSE_CLAZZ: processSessionClose(ctx, msg); break; @@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto)); } + private void processAttributeUpdate(PluginContext ctx, RpcMsg msg) { + AttributeUpdateProto proto; + try { + proto = AttributeUpdateProto.parseFrom(msg.getMsgData()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + subscriptionManager.onAttributesUpdateFromServer(ctx, DeviceId.fromString(proto.getDeviceId()), proto.getScope(), + proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList())); + } + private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) { SubscriptionProto proto; try { @@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { } else { Map> data = new TreeMap<>(); proto.getDataList().forEach(v -> { - List values = data.get(v.getKey()); - if (values == null) { - values = new ArrayList<>(); - data.put(v.getKey(), values); - } + List values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>()); for (int i = 0; i < v.getTsCount(); i++) { Object[] value = new Object[2]; value[0] = v.getTs(i); @@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { return new SubscriptionUpdate(proto.getSubscriptionId(), data); } } + + public void onAttributesUpdate(PluginContext ctx, ServerAddress address, DeviceId deviceId, String scope, List attributes) { + ctx.sendPluginRpcMsg(new RpcMsg(address, ATTRIBUTES_UPDATE_CLAZZ, getAttributesUpdateProto(deviceId, scope, attributes).toByteArray())); + } + + private AttributeUpdateProto getAttributesUpdateProto(DeviceId deviceId, String scope, List attributes) { + AttributeUpdateProto.Builder builder = AttributeUpdateProto.newBuilder(); + builder.setDeviceId(deviceId.toString()); + builder.setScope(scope); + attributes.forEach( + attr -> { + AttributeUpdateValueListProto.Builder dataBuilder = AttributeUpdateValueListProto.newBuilder(); + dataBuilder.setKey(attr.getKey()); + dataBuilder.setTs(attr.getLastUpdateTs()); + dataBuilder.setValueType(attr.getDataType().ordinal()); + switch (attr.getDataType()) { + case BOOLEAN: + dataBuilder.setBoolValue(attr.getBooleanValue().get()); + break; + case LONG: + dataBuilder.setLongValue(attr.getLongValue().get()); + break; + case DOUBLE: + dataBuilder.setDoubleValue(attr.getDoubleValue().get()); + break; + case STRING: + dataBuilder.setStrValue(attr.getStrValue().get()); + break; + } + builder.addData(dataBuilder.build()); + } + ); + return builder.build(); + } + + private AttributeKvEntry toAttribute(AttributeUpdateValueListProto proto) { + KvEntry entry = null; + DataType type = DataType.values()[proto.getValueType()]; + switch (type) { + case BOOLEAN: + entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue()); + break; + case LONG: + entry = new LongDataEntry(proto.getKey(), proto.getLongValue()); + break; + case DOUBLE: + entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue()); + break; + case STRING: + entry = new StringDataEntry(proto.getKey(), proto.getStrValue()); + break; + } + return new BaseAttributeKvEntry(entry, proto.getTs()); + } + } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java index f69d17b9f1..f14d25d016 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRuleMsgHandler.java @@ -58,10 +58,14 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler { ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response)); } - private List getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Set names) { + private List getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional> names) { List attributes; - if (!names.isEmpty()) { - attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names)); + if (names.isPresent()) { + if (!names.get().isEmpty()) { + attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get())); + } else { + attributes = ctx.loadAttributes(deviceId, scope); + } } else { attributes = Collections.emptyList(); } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index 8e2d62abeb..f268dd8ce1 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -104,7 +104,13 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { SubscriptionState sub; if (keysOptional.isPresent()) { List keys = new ArrayList<>(keysOptional.get()); - List data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keys); + List data = new ArrayList<>(); + if (StringUtils.isEmpty(cmd.getScope())) { + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys))); + } else { + data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys)); + } + List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); @@ -114,7 +120,12 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState); } else { - List data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE); + List data = new ArrayList<>(); + if (StringUtils.isEmpty(cmd.getScope())) { + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s))); + } else { + data.addAll(ctx.loadAttributes(deviceId, cmd.getScope())); + } List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); diff --git a/extensions-core/src/main/proto/telemetry.proto b/extensions-core/src/main/proto/telemetry.proto index 5c7d7a483b..60e40f7748 100644 --- a/extensions-core/src/main/proto/telemetry.proto +++ b/extensions-core/src/main/proto/telemetry.proto @@ -36,6 +36,12 @@ message SubscriptionUpdateProto { repeated SubscriptionUpdateValueListProto data = 5; } +message AttributeUpdateProto { + string deviceId = 1; + string scope = 2; + repeated AttributeUpdateValueListProto data = 3; +} + message SessionCloseProto { string sessionId = 1; } @@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto { string key = 1; repeated int64 ts = 2; repeated string value = 3; +} + +message AttributeUpdateValueListProto { + string key = 1; + int64 ts = 2; + int32 valueType = 3; + string strValue = 4; + int64 longValue = 5; + double doubleValue = 6; + bool boolValue = 7; } \ No newline at end of file diff --git a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java index 26a905652a..a9c6086eea 100644 --- a/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java +++ b/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java @@ -167,17 +167,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException { List queryElements = inbound.getOptions().getUriQuery(); - if (queryElements == null || queryElements.size() == 0) { - log.warn("[{}] Query is empty!", ctx.getSessionId()); - throw new AdaptorException(new IllegalArgumentException("Query is empty!")); + if (queryElements != null || queryElements.size() > 0) { + Set clientKeys = toKeys(ctx, queryElements, "clientKeys"); + Set sharedKeys = toKeys(ctx, queryElements, "sharedKeys"); + return new BasicGetAttributesRequest(0, clientKeys, sharedKeys); + } else { + return new BasicGetAttributesRequest(0); } - - Set clientKeys = toKeys(ctx, queryElements, "clientKeys"); - Set sharedKeys = toKeys(ctx, queryElements, "sharedKeys"); - if (clientKeys.isEmpty() && sharedKeys.isEmpty()) { - throw new AdaptorException("No clientKeys and serverKeys parameters!"); - } - return new BasicGetAttributesRequest(0, clientKeys, sharedKeys); } private Set toKeys(SessionContext ctx, List queryElements, String attributeName) throws AdaptorException { @@ -191,7 +187,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor { if (!StringUtils.isEmpty(keys)) { return new HashSet<>(Arrays.asList(keys.split(","))); } else { - return Collections.emptySet(); + return null; } } diff --git a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java index a2d6c25f69..ceb6813ad7 100644 --- a/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java +++ b/transport/coap/src/test/java/org/thingsboard/server/transport/coap/CoapServerTest.java @@ -182,7 +182,7 @@ public class CoapServerTest { public void testNoKeysAttributesGetRequest() { CoapClient client = new CoapClient(getBaseTestUrl() + DEVICE1_TOKEN + "/" + FeatureType.ATTRIBUTES.name().toLowerCase() + "?data=key1,key2"); CoapResponse response = client.setTimeout(6000).get(); - Assert.assertEquals(ResponseCode.BAD_REQUEST, response.getCode()); + Assert.assertEquals(ResponseCode.CONTENT, response.getCode()); } @Test diff --git a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index e3e06663f2..e815bc5de9 100644 --- a/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -38,6 +38,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.transport.http.session.HttpSessionCtx; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -60,20 +61,22 @@ public class DeviceApiController { @RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json") public DeferredResult getDeviceAttributes(@PathVariable("deviceToken") String deviceToken, - @RequestParam(value = "clientKeys", required = false) String clientKeys, - @RequestParam(value = "sharedKeys", required = false) String sharedKeys) { + @RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys, + @RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) { DeferredResult responseWriter = new DeferredResult(); - if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - } else { - HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); - if (ctx.login(new DeviceTokenCredentials(deviceToken))) { - Set clientKeySet = new HashSet<>(Arrays.asList(clientKeys.split(","))); - Set sharedKeySet = new HashSet<>(Arrays.asList(clientKeys.split(","))); - process(ctx, new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet)); + HttpSessionCtx ctx = getHttpSessionCtx(responseWriter); + if (ctx.login(new DeviceTokenCredentials(deviceToken))) { + GetAttributesRequest request; + if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) { + request = new BasicGetAttributesRequest(0); } else { - responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); + Set clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null; + Set sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null; + request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet); } + process(ctx, request); + } else { + responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED)); } return responseWriter; diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 5af244ad74..e84e848541 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -162,8 +162,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())); String payload = inbound.payload().toString(UTF8); JsonElement requestBody = new JsonParser().parse(payload); - return new BasicGetAttributesRequest(requestId, - toStringSet(requestBody, "clientKeys"), toStringSet(requestBody, "sharedKeys")); + Set clientKeys = toStringSet(requestBody, "clientKeys"); + Set sharedKeys = toStringSet(requestBody, "sharedKeys"); + if (clientKeys == null && sharedKeys == null) { + return new BasicGetAttributesRequest(requestId); + } else { + return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys); + } } catch (RuntimeException e) { log.warn("Failed to decode get attributes request", e); throw new AdaptorException(e); @@ -189,7 +194,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { if (element != null) { return new HashSet<>(Arrays.asList(element.getAsString().split(","))); } else { - return Collections.emptySet(); + return null; } } diff --git a/ui/src/app/api/device.service.js b/ui/src/app/api/device.service.js index cf38d12c23..27cf605c6b 100644 --- a/ui/src/app/api/device.service.js +++ b/ui/src/app/api/device.service.js @@ -293,7 +293,8 @@ function DeviceService($http, $q, $filter, telemetryWebsocketService, types) { var deviceAttributesSubscription = deviceAttributesSubscriptionMap[subscriptionId]; if (!deviceAttributesSubscription) { var subscriptionCommand = { - deviceId: deviceId + deviceId: deviceId, + scope: attributeScope }; var type = attributeScope === types.latestTelemetry.value ?