Improvements
This commit is contained in:
parent
15f442fb18
commit
ccd4f4b096
@ -18,6 +18,8 @@ package org.thingsboard.server.common.msg.core;
|
|||||||
import lombok.ToString;
|
import lombok.ToString;
|
||||||
import org.thingsboard.server.common.msg.session.MsgType;
|
import org.thingsboard.server.common.msg.session.MsgType;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@ToString
|
@ToString
|
||||||
@ -28,6 +30,10 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
|
|||||||
private final Set<String> clientKeys;
|
private final Set<String> clientKeys;
|
||||||
private final Set<String> sharedKeys;
|
private final Set<String> sharedKeys;
|
||||||
|
|
||||||
|
public BasicGetAttributesRequest(Integer requestId) {
|
||||||
|
this(requestId, Collections.emptySet(), Collections.emptySet());
|
||||||
|
}
|
||||||
|
|
||||||
public BasicGetAttributesRequest(Integer requestId, Set<String> clientKeys, Set<String> sharedKeys) {
|
public BasicGetAttributesRequest(Integer requestId, Set<String> clientKeys, Set<String> sharedKeys) {
|
||||||
super(requestId);
|
super(requestId);
|
||||||
this.clientKeys = clientKeys;
|
this.clientKeys = clientKeys;
|
||||||
@ -40,13 +46,13 @@ public class BasicGetAttributesRequest extends BasicRequest implements GetAttrib
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getClientAttributeNames() {
|
public Optional<Set<String>> getClientAttributeNames() {
|
||||||
return clientKeys;
|
return Optional.of(clientKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getSharedAttributeNames() {
|
public Optional<Set<String>> getSharedAttributeNames() {
|
||||||
return sharedKeys;
|
return Optional.ofNullable(sharedKeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.common.msg.core;
|
package org.thingsboard.server.common.msg.core;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
|
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
|
||||||
@ -22,7 +23,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
|
|||||||
|
|
||||||
public interface GetAttributesRequest extends FromDeviceRequestMsg {
|
public interface GetAttributesRequest extends FromDeviceRequestMsg {
|
||||||
|
|
||||||
Set<String> getClientAttributeNames();
|
Optional<Set<String>> getClientAttributeNames();
|
||||||
Set<String> getSharedAttributeNames();
|
Optional<Set<String>> getSharedAttributeNames();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -24,10 +24,6 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
public class AttributesSubscriptionCmd extends SubscriptionCmd {
|
public class AttributesSubscriptionCmd extends SubscriptionCmd {
|
||||||
|
|
||||||
public AttributesSubscriptionCmd(int cmdId, String deviceId, String keys, boolean unsubscribe) {
|
|
||||||
super(cmdId, deviceId, keys, unsubscribe);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SubscriptionType getType() {
|
public SubscriptionType getType() {
|
||||||
return SubscriptionType.ATTRIBUTES;
|
return SubscriptionType.ATTRIBUTES;
|
||||||
|
|||||||
@ -26,6 +26,7 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
|
|||||||
private int cmdId;
|
private int cmdId;
|
||||||
private String deviceId;
|
private String deviceId;
|
||||||
private String keys;
|
private String keys;
|
||||||
|
private String scope;
|
||||||
private boolean unsubscribe;
|
private boolean unsubscribe;
|
||||||
|
|
||||||
public abstract SubscriptionType getType();
|
public abstract SubscriptionType getType();
|
||||||
@ -62,6 +63,14 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
|
|||||||
this.unsubscribe = unsubscribe;
|
this.unsubscribe = unsubscribe;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getScope() {
|
||||||
|
return scope;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeys(String keys) {
|
||||||
|
this.keys = keys;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
|
return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";
|
||||||
|
|||||||
@ -26,11 +26,6 @@ public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
|
|||||||
|
|
||||||
private long timeWindow;
|
private long timeWindow;
|
||||||
|
|
||||||
public TimeseriesSubscriptionCmd(int cmdId, String deviceId, String keys, boolean unsubscribe, long timeWindow) {
|
|
||||||
super(cmdId, deviceId, keys, unsubscribe);
|
|
||||||
this.timeWindow = timeWindow;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getTimeWindow() {
|
public long getTimeWindow() {
|
||||||
return timeWindow;
|
return timeWindow;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -58,10 +58,14 @@ public class TelemetryRuleMsgHandler extends DefaultRuleMsgHandler {
|
|||||||
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
|
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, response));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Set<String> names) {
|
private List<AttributeKvEntry> getAttributeKvEntries(PluginContext ctx, DeviceId deviceId, String scope, Optional<Set<String>> names) {
|
||||||
List<AttributeKvEntry> attributes;
|
List<AttributeKvEntry> attributes;
|
||||||
if (!names.isEmpty()) {
|
if (names.isPresent()) {
|
||||||
attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names));
|
if (!names.get().isEmpty()) {
|
||||||
|
attributes = ctx.loadAttributes(deviceId, scope, new ArrayList<>(names.get()));
|
||||||
|
} else {
|
||||||
|
attributes = ctx.loadAttributes(deviceId, scope);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
attributes = Collections.emptyList();
|
attributes = Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -105,7 +105,12 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
if (keysOptional.isPresent()) {
|
if (keysOptional.isPresent()) {
|
||||||
List<String> keys = new ArrayList<>(keysOptional.get());
|
List<String> keys = new ArrayList<>(keysOptional.get());
|
||||||
List<AttributeKvEntry> data = new ArrayList<>();
|
List<AttributeKvEntry> data = new ArrayList<>();
|
||||||
Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
|
if (StringUtils.isEmpty(cmd.getScope())) {
|
||||||
|
Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys)));
|
||||||
|
} else {
|
||||||
|
data.addAll(ctx.loadAttributes(deviceId, cmd.getScope(), keys));
|
||||||
|
}
|
||||||
|
|
||||||
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
|
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
|
||||||
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
|
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
|
||||||
|
|
||||||
@ -116,7 +121,11 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
|
sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState);
|
||||||
} else {
|
} else {
|
||||||
List<AttributeKvEntry> data = new ArrayList<>();
|
List<AttributeKvEntry> data = new ArrayList<>();
|
||||||
Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
|
if (StringUtils.isEmpty(cmd.getScope())) {
|
||||||
|
Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s)));
|
||||||
|
} else {
|
||||||
|
data.addAll(ctx.loadAttributes(deviceId, cmd.getScope()));
|
||||||
|
}
|
||||||
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
|
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
|
||||||
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
|
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData));
|
||||||
|
|
||||||
|
|||||||
@ -167,17 +167,13 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
|
|||||||
|
|
||||||
private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
|
private FromDeviceMsg convertToGetAttributesRequest(SessionContext ctx, Request inbound) throws AdaptorException {
|
||||||
List<String> queryElements = inbound.getOptions().getUriQuery();
|
List<String> queryElements = inbound.getOptions().getUriQuery();
|
||||||
if (queryElements == null || queryElements.size() == 0) {
|
if (queryElements != null || queryElements.size() > 0) {
|
||||||
log.warn("[{}] Query is empty!", ctx.getSessionId());
|
Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");
|
||||||
throw new AdaptorException(new IllegalArgumentException("Query is empty!"));
|
Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");
|
||||||
|
return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);
|
||||||
|
} else {
|
||||||
|
return new BasicGetAttributesRequest(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<String> clientKeys = toKeys(ctx, queryElements, "clientKeys");
|
|
||||||
Set<String> sharedKeys = toKeys(ctx, queryElements, "sharedKeys");
|
|
||||||
if (clientKeys.isEmpty() && sharedKeys.isEmpty()) {
|
|
||||||
throw new AdaptorException("No clientKeys and serverKeys parameters!");
|
|
||||||
}
|
|
||||||
return new BasicGetAttributesRequest(0, clientKeys, sharedKeys);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<String> toKeys(SessionContext ctx, List<String> queryElements, String attributeName) throws AdaptorException {
|
private Set<String> toKeys(SessionContext ctx, List<String> queryElements, String attributeName) throws AdaptorException {
|
||||||
@ -191,7 +187,7 @@ public class JsonCoapAdaptor implements CoapTransportAdaptor {
|
|||||||
if (!StringUtils.isEmpty(keys)) {
|
if (!StringUtils.isEmpty(keys)) {
|
||||||
return new HashSet<>(Arrays.asList(keys.split(",")));
|
return new HashSet<>(Arrays.asList(keys.split(",")));
|
||||||
} else {
|
} else {
|
||||||
return Collections.emptySet();
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -182,7 +182,7 @@ public class CoapServerTest {
|
|||||||
public void testNoKeysAttributesGetRequest() {
|
public void testNoKeysAttributesGetRequest() {
|
||||||
CoapClient client = new CoapClient(getBaseTestUrl() + DEVICE1_TOKEN + "/" + FeatureType.ATTRIBUTES.name().toLowerCase() + "?data=key1,key2");
|
CoapClient client = new CoapClient(getBaseTestUrl() + DEVICE1_TOKEN + "/" + FeatureType.ATTRIBUTES.name().toLowerCase() + "?data=key1,key2");
|
||||||
CoapResponse response = client.setTimeout(6000).get();
|
CoapResponse response = client.setTimeout(6000).get();
|
||||||
Assert.assertEquals(ResponseCode.BAD_REQUEST, response.getCode());
|
Assert.assertEquals(ResponseCode.CONTENT, response.getCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@ -38,6 +38,7 @@ import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
|||||||
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
|
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@ -60,20 +61,22 @@ public class DeviceApiController {
|
|||||||
|
|
||||||
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
|
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
|
||||||
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
|
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
|
||||||
@RequestParam(value = "clientKeys", required = false) String clientKeys,
|
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
|
||||||
@RequestParam(value = "sharedKeys", required = false) String sharedKeys) {
|
@RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys) {
|
||||||
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
|
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
|
||||||
if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
|
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
|
||||||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
|
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
||||||
} else {
|
GetAttributesRequest request;
|
||||||
HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
|
if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
|
||||||
if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
|
request = new BasicGetAttributesRequest(0);
|
||||||
Set<String> clientKeySet = new HashSet<>(Arrays.asList(clientKeys.split(",")));
|
|
||||||
Set<String> sharedKeySet = new HashSet<>(Arrays.asList(clientKeys.split(",")));
|
|
||||||
process(ctx, new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet));
|
|
||||||
} else {
|
} else {
|
||||||
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
|
||||||
|
Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
|
||||||
|
request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
|
||||||
}
|
}
|
||||||
|
process(ctx, request);
|
||||||
|
} else {
|
||||||
|
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
|
||||||
}
|
}
|
||||||
|
|
||||||
return responseWriter;
|
return responseWriter;
|
||||||
|
|||||||
@ -162,8 +162,13 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
|
|||||||
Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
|
Integer requestId = Integer.valueOf(topicName.substring(MqttTransportHandler.ATTRIBUTES_REQUEST_TOPIC_PREFIX.length()));
|
||||||
String payload = inbound.payload().toString(UTF8);
|
String payload = inbound.payload().toString(UTF8);
|
||||||
JsonElement requestBody = new JsonParser().parse(payload);
|
JsonElement requestBody = new JsonParser().parse(payload);
|
||||||
return new BasicGetAttributesRequest(requestId,
|
Set<String> clientKeys = toStringSet(requestBody, "clientKeys");
|
||||||
toStringSet(requestBody, "clientKeys"), toStringSet(requestBody, "sharedKeys"));
|
Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys");
|
||||||
|
if (clientKeys == null && sharedKeys == null) {
|
||||||
|
return new BasicGetAttributesRequest(requestId);
|
||||||
|
} else {
|
||||||
|
return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys);
|
||||||
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.warn("Failed to decode get attributes request", e);
|
log.warn("Failed to decode get attributes request", e);
|
||||||
throw new AdaptorException(e);
|
throw new AdaptorException(e);
|
||||||
@ -189,7 +194,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
|
|||||||
if (element != null) {
|
if (element != null) {
|
||||||
return new HashSet<>(Arrays.asList(element.getAsString().split(",")));
|
return new HashSet<>(Arrays.asList(element.getAsString().split(",")));
|
||||||
} else {
|
} else {
|
||||||
return Collections.emptySet();
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -293,7 +293,8 @@ function DeviceService($http, $q, $filter, telemetryWebsocketService, types) {
|
|||||||
var deviceAttributesSubscription = deviceAttributesSubscriptionMap[subscriptionId];
|
var deviceAttributesSubscription = deviceAttributesSubscriptionMap[subscriptionId];
|
||||||
if (!deviceAttributesSubscription) {
|
if (!deviceAttributesSubscription) {
|
||||||
var subscriptionCommand = {
|
var subscriptionCommand = {
|
||||||
deviceId: deviceId
|
deviceId: deviceId,
|
||||||
|
scope: attributeScope
|
||||||
};
|
};
|
||||||
|
|
||||||
var type = attributeScope === types.latestTelemetry.value ?
|
var type = attributeScope === types.latestTelemetry.value ?
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user