From 7e0d4df80858be80776bcf0f87bb149e8164218d Mon Sep 17 00:00:00 2001 From: oleg Date: Thu, 11 Jan 2018 18:08:36 +0200 Subject: [PATCH] Attribute subscription --- .../core/plugin/telemetry/SubscriptionManager.java | 9 +++++++-- .../telemetry/handlers/TelemetryRpcMsgHandler.java | 3 ++- .../telemetry/handlers/TelemetryWebsocketMsgHandler.java | 8 ++++---- .../core/plugin/telemetry/sub/Subscription.java | 4 ++++ .../core/plugin/telemetry/sub/SubscriptionState.java | 1 + extensions-core/src/main/proto/telemetry.proto | 1 + 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java index d137e10fbd..bad1678d77 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/SubscriptionManager.java @@ -33,6 +33,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionU import java.util.*; import java.util.function.Function; +import java.util.function.Predicate; /** * @author Andrew Shvayka @@ -174,9 +175,13 @@ public class SubscriptionManager { } public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function> f) { + onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f); + } + + public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, Predicate filter, Function> f) { Set deviceSubscriptions = subscriptionsByEntityId.get(entityId); if (deviceSubscriptions != null) { - deviceSubscriptions.stream().filter(s -> type == s.getType()).forEach(s -> { + deviceSubscriptions.stream().filter(filter).forEach(s -> { String sessionId = s.getWsSessionId(); List subscriptionUpdate = f.apply(s); if (!subscriptionUpdate.isEmpty()) { @@ -206,7 +211,7 @@ public class SubscriptionManager { public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List attributes) { Optional serverAddress = ctx.resolve(entityId); if (!serverAddress.isPresent()) { - onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.ATTRIBUTES, s -> { + onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && scope.equals(s.getScope()), s -> { List subscriptionUpdate = new ArrayList(); for (AttributeKvEntry kv : attributes) { if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java index ba5d61080e..874c480f0d 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRpcMsgHandler.java @@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { } Map statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs)); Subscription subscription = new Subscription( - new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap), + new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()), false, msg.getServerAddress()); subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription); } @@ -127,6 +127,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler { builder.setEntityId(cmd.getEntityId().getId().toString()); builder.setType(cmd.getType().name()); builder.setAllKeys(cmd.isAllKeys()); + builder.setScope(cmd.getScope()); cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build())); ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray())); } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index 7b0e6d8fcd..6f02c9a5f2 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -131,7 +131,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { keys.forEach(key -> subState.put(key, 0L)); attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope()); subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); } @@ -168,7 +168,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { Map subState = new HashMap<>(attributesData.size()); attributesData.forEach(v -> subState.put(v.getKey(), v.getTs())); - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope()); subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); } @@ -234,7 +234,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); Map subState = new HashMap<>(data.size()); data.forEach(v -> subState.put(v.getKey(), v.getTs())); - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope()); subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); } @@ -262,7 +262,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { Map subState = new HashMap<>(keys.size()); keys.forEach(key -> subState.put(key, startTs)); data.forEach(v -> subState.put(v.getKey(), v.getTs())); - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope()); subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub); } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java index 1285cfa903..360b018d14 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/Subscription.java @@ -51,6 +51,10 @@ public class Subscription { return getSub().getType(); } + public String getScope() { + return getSub().getScope(); + } + public boolean isAllKeys() { return getSub().isAllKeys(); } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java index 5e15fda502..7e0a2ba48f 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/sub/SubscriptionState.java @@ -33,6 +33,7 @@ public class SubscriptionState { @Getter private final SubscriptionType type; @Getter private final boolean allKeys; @Getter private final Map keyStates; + @Getter private final String scope; @Override public boolean equals(Object o) { diff --git a/extensions-core/src/main/proto/telemetry.proto b/extensions-core/src/main/proto/telemetry.proto index 2bfef5908b..59c5c14fd5 100644 --- a/extensions-core/src/main/proto/telemetry.proto +++ b/extensions-core/src/main/proto/telemetry.proto @@ -27,6 +27,7 @@ message SubscriptionProto { string type = 5; bool allKeys = 6; repeated SubscriptionKetStateProto keyStates = 7; + string scope = 8; } message SubscriptionUpdateProto {