diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 34ae9b2110..a446680bee 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -29,6 +29,8 @@ public class DataConstants { public static final String SERVER_SCOPE = "SERVER_SCOPE"; public static final String SHARED_SCOPE = "SHARED_SCOPE"; + public static final String[] ALL_SCOPES = {CLIENT_SCOPE, SHARED_SCOPE, SERVER_SCOPE}; + public static final String ALARM = "ALARM"; public static final String ERROR = "ERROR"; public static final String LC_EVENT = "LC_EVENT"; diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java index 190d9ffa0c..07c9629f2d 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java @@ -175,6 +175,23 @@ public class SubscriptionManager { } } + public void onAttributesUpdateFromServer(PluginContext ctx, DeviceId deviceId, String scope, List attributes) { + Optional serverAddress = ctx.resolve(deviceId); + if (!serverAddress.isPresent()) { + onLocalSubscriptionUpdate(ctx, deviceId, SubscriptionType.ATTRIBUTES, s -> { + List subscriptionUpdate = new ArrayList(); + for (AttributeKvEntry kv : attributes) { + if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { + subscriptionUpdate.add(new BasicTsKvEntry(kv.getLastUpdateTs(), kv)); + } + } + return subscriptionUpdate; + }); + } else { + rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), deviceId, scope, attributes); + } + } + private void updateSubscriptionState(String sessionId, Subscription subState, SubscriptionUpdate update) { log.trace("[{}] updating subscription state {} using onUpdate {}", sessionId, subState, update); update.getLatestValues().entrySet().forEach(e -> subState.setKeyState(e.getKey(), e.getValue())); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java index 63d145ded9..8668639ea5 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/TelemetryStoragePlugin.java @@ -43,7 +43,7 @@ public class TelemetryStoragePlugin extends AbstractPlugin(); + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(ctx.loadAttributes(deviceId, s))); } List keys = attributes.stream().map(attrKv -> attrKv.getKey()).collect(Collectors.toList()); msg.getResponseHolder().setResult(new ResponseEntity<>(keys, HttpStatus.OK)); @@ -99,9 +105,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { if (!StringUtils.isEmpty(scope)) { attributes = getAttributeKvEntries(ctx, scope, deviceId, keys); } else { - attributes = getAttributeKvEntries(ctx, DataConstants.CLIENT_SCOPE, deviceId, keys); - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SHARED_SCOPE, deviceId, keys)); - attributes.addAll(getAttributeKvEntries(ctx, DataConstants.SERVER_SCOPE, deviceId, keys)); + attributes = new ArrayList<>(); + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> attributes.addAll(getAttributeKvEntries(ctx, s, deviceId, keys))); } List values = attributes.stream().map(attribute -> new AttributeData(attribute.getLastUpdateTs(), attribute.getKey(), attribute.getValue())).collect(Collectors.toList()); @@ -145,6 +150,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { @Override public void onSuccess(PluginContext ctx, Void value) { msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.OK)); + subscriptionManager.onAttributesUpdateFromServer(ctx, deviceId, scope, attributes); } @Override @@ -172,7 +178,8 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { DeviceId deviceId = DeviceId.fromString(pathParams[0]); String scope = pathParams[1]; if (DataConstants.SERVER_SCOPE.equals(scope) || - DataConstants.SHARED_SCOPE.equals(scope)) { + DataConstants.SHARED_SCOPE.equals(scope) || + DataConstants.CLIENT_SCOPE.equals(scope)) { String keysParam = request.getParameter("keys"); if (!StringUtils.isEmpty(keysParam)) { String[] keys = keysParam.split(","); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java index 06467fe1a1..e59fa64330 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java @@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.extensions.api.plugins.PluginContext; import org.thingsboard.server.extensions.api.plugins.handlers.RpcMsgHandler; @@ -42,9 +43,10 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { private final SubscriptionManager subscriptionManager; private static final int SUBSCRIPTION_CLAZZ = 1; - private static final int SUBSCRIPTION_UPDATE_CLAZZ = 2; - private static final int SESSION_CLOSE_CLAZZ = 3; - private static final int SUBSCRIPTION_CLOSE_CLAZZ = 4; + private static final int ATTRIBUTES_UPDATE_CLAZZ = 2; + private static final int SUBSCRIPTION_UPDATE_CLAZZ = 3; + private static final int SESSION_CLOSE_CLAZZ = 4; + private static final int SUBSCRIPTION_CLOSE_CLAZZ = 5; @Override public void process(PluginContext ctx, RpcMsg msg) { @@ -55,6 +57,9 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { case SUBSCRIPTION_UPDATE_CLAZZ: processRemoteSubscriptionUpdate(ctx, msg); break; + case ATTRIBUTES_UPDATE_CLAZZ: + processAttributeUpdate(ctx, msg); + break; case SESSION_CLOSE_CLAZZ: processSessionClose(ctx, msg); break; @@ -76,6 +81,17 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { subscriptionManager.onRemoteSubscriptionUpdate(ctx, proto.getSessionId(), convert(proto)); } + private void processAttributeUpdate(PluginContext ctx, RpcMsg msg) { + AttributeUpdateProto proto; + try { + proto = AttributeUpdateProto.parseFrom(msg.getMsgData()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + subscriptionManager.onAttributesUpdateFromServer(ctx, DeviceId.fromString(proto.getDeviceId()), proto.getScope(), + proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList())); + } + private void processSubscriptionCmd(PluginContext ctx, RpcMsg msg) { SubscriptionProto proto; try { @@ -167,11 +183,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { } else { Map> data = new TreeMap<>(); proto.getDataList().forEach(v -> { - List values = data.get(v.getKey()); - if (values == null) { - values = new ArrayList<>(); - data.put(v.getKey(), values); - } + List values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>()); for (int i = 0; i < v.getTsCount(); i++) { Object[] value = new Object[2]; value[0] = v.getTs(i); @@ -182,4 +194,59 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { return new SubscriptionUpdate(proto.getSubscriptionId(), data); } } + + public void onAttributesUpdate(PluginContext ctx, ServerAddress address, DeviceId deviceId, String scope, List attributes) { + ctx.sendPluginRpcMsg(new RpcMsg(address, ATTRIBUTES_UPDATE_CLAZZ, getAttributesUpdateProto(deviceId, scope, attributes).toByteArray())); + } + + private AttributeUpdateProto getAttributesUpdateProto(DeviceId deviceId, String scope, List attributes) { + AttributeUpdateProto.Builder builder = AttributeUpdateProto.newBuilder(); + builder.setDeviceId(deviceId.toString()); + builder.setScope(scope); + attributes.forEach( + attr -> { + AttributeUpdateValueListProto.Builder dataBuilder = AttributeUpdateValueListProto.newBuilder(); + dataBuilder.setKey(attr.getKey()); + dataBuilder.setTs(attr.getLastUpdateTs()); + dataBuilder.setValueType(attr.getDataType().ordinal()); + switch (attr.getDataType()) { + case BOOLEAN: + dataBuilder.setBoolValue(attr.getBooleanValue().get()); + break; + case LONG: + dataBuilder.setLongValue(attr.getLongValue().get()); + break; + case DOUBLE: + dataBuilder.setDoubleValue(attr.getDoubleValue().get()); + break; + case STRING: + dataBuilder.setStrValue(attr.getStrValue().get()); + break; + } + builder.addData(dataBuilder.build()); + } + ); + return builder.build(); + } + + private AttributeKvEntry toAttribute(AttributeUpdateValueListProto proto) { + KvEntry entry = null; + DataType type = DataType.values()[proto.getValueType()]; + switch (type) { + case BOOLEAN: + entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue()); + break; + case LONG: + entry = new LongDataEntry(proto.getKey(), proto.getLongValue()); + break; + case DOUBLE: + entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue()); + break; + case STRING: + entry = new StringDataEntry(proto.getKey(), proto.getStrValue()); + break; + } + return new BaseAttributeKvEntry(entry, proto.getTs()); + } + } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index 8e2d62abeb..849b3225d5 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -104,7 +104,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { SubscriptionState sub; if (keysOptional.isPresent()) { List keys = new ArrayList<>(keysOptional.get()); - List data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE, keys); + List data = new ArrayList<>(); + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s, keys))); List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); @@ -114,7 +115,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.ATTRIBUTES, false, subState); } else { - List data = ctx.loadAttributes(deviceId, DataConstants.CLIENT_SCOPE); + List data = new ArrayList<>(); + Arrays.stream(DataConstants.ALL_SCOPES).forEach(s -> data.addAll(ctx.loadAttributes(deviceId, s))); List attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList()); sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), attributesData)); diff --git a/extensions-core/src/main/proto/telemetry.proto b/extensions-core/src/main/proto/telemetry.proto index 5c7d7a483b..60e40f7748 100644 --- a/extensions-core/src/main/proto/telemetry.proto +++ b/extensions-core/src/main/proto/telemetry.proto @@ -36,6 +36,12 @@ message SubscriptionUpdateProto { repeated SubscriptionUpdateValueListProto data = 5; } +message AttributeUpdateProto { + string deviceId = 1; + string scope = 2; + repeated AttributeUpdateValueListProto data = 3; +} + message SessionCloseProto { string sessionId = 1; } @@ -54,4 +60,14 @@ message SubscriptionUpdateValueListProto { string key = 1; repeated int64 ts = 2; repeated string value = 3; +} + +message AttributeUpdateValueListProto { + string key = 1; + int64 ts = 2; + int32 valueType = 3; + string strValue = 4; + int64 longValue = 5; + double doubleValue = 6; + bool boolValue = 7; } \ No newline at end of file