commit
39d0cf7e7c
@ -33,6 +33,7 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionU
|
|||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
@ -174,9 +175,13 @@ public class SubscriptionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
|
public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, SubscriptionType type, Function<Subscription, List<TsKvEntry>> f) {
|
||||||
|
onLocalSubscriptionUpdate(ctx, entityId, s -> type == s.getType(), f);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onLocalSubscriptionUpdate(PluginContext ctx, EntityId entityId, Predicate<Subscription> filter, Function<Subscription, List<TsKvEntry>> f) {
|
||||||
Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
|
Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.get(entityId);
|
||||||
if (deviceSubscriptions != null) {
|
if (deviceSubscriptions != null) {
|
||||||
deviceSubscriptions.stream().filter(s -> type == s.getType()).forEach(s -> {
|
deviceSubscriptions.stream().filter(filter).forEach(s -> {
|
||||||
String sessionId = s.getWsSessionId();
|
String sessionId = s.getWsSessionId();
|
||||||
List<TsKvEntry> subscriptionUpdate = f.apply(s);
|
List<TsKvEntry> subscriptionUpdate = f.apply(s);
|
||||||
if (!subscriptionUpdate.isEmpty()) {
|
if (!subscriptionUpdate.isEmpty()) {
|
||||||
@ -206,7 +211,7 @@ public class SubscriptionManager {
|
|||||||
public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
|
public void onAttributesUpdateFromServer(PluginContext ctx, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
|
||||||
Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
|
Optional<ServerAddress> serverAddress = ctx.resolve(entityId);
|
||||||
if (!serverAddress.isPresent()) {
|
if (!serverAddress.isPresent()) {
|
||||||
onLocalSubscriptionUpdate(ctx, entityId, SubscriptionType.ATTRIBUTES, s -> {
|
onLocalSubscriptionUpdate(ctx, entityId, s -> SubscriptionType.ATTRIBUTES == s.getType() && scope.equals(s.getScope()), s -> {
|
||||||
List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
|
List<TsKvEntry> subscriptionUpdate = new ArrayList<TsKvEntry>();
|
||||||
for (AttributeKvEntry kv : attributes) {
|
for (AttributeKvEntry kv : attributes) {
|
||||||
if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
|
if (s.isAllKeys() || s.getKeyStates().containsKey(kv.getKey())) {
|
||||||
|
|||||||
@ -114,7 +114,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
|
|||||||
}
|
}
|
||||||
Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
|
Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect(Collectors.toMap(SubscriptionKetStateProto::getKey, SubscriptionKetStateProto::getTs));
|
||||||
Subscription subscription = new Subscription(
|
Subscription subscription = new Subscription(
|
||||||
new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap),
|
new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), SubscriptionType.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()),
|
||||||
false, msg.getServerAddress());
|
false, msg.getServerAddress());
|
||||||
subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
|
subscriptionManager.addRemoteWsSubscription(ctx, msg.getServerAddress(), proto.getSessionId(), subscription);
|
||||||
}
|
}
|
||||||
@ -127,6 +127,7 @@ public class TelemetryRpcMsgHandler implements RpcMsgHandler {
|
|||||||
builder.setEntityId(cmd.getEntityId().getId().toString());
|
builder.setEntityId(cmd.getEntityId().getId().toString());
|
||||||
builder.setType(cmd.getType().name());
|
builder.setType(cmd.getType().name());
|
||||||
builder.setAllKeys(cmd.isAllKeys());
|
builder.setAllKeys(cmd.isAllKeys());
|
||||||
|
builder.setScope(cmd.getScope());
|
||||||
cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
|
cmd.getKeyStates().entrySet().forEach(e -> builder.addKeyStates(SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build()));
|
||||||
ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
|
ctx.sendPluginRpcMsg(new RpcMsg(address, SUBSCRIPTION_CLAZZ, builder.build().toByteArray()));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -131,7 +131,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
keys.forEach(key -> subState.put(key, 0L));
|
keys.forEach(key -> subState.put(key, 0L));
|
||||||
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
||||||
|
|
||||||
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState);
|
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, false, subState, cmd.getScope());
|
||||||
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,7 +168,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
Map<String, Long> subState = new HashMap<>(attributesData.size());
|
Map<String, Long> subState = new HashMap<>(attributesData.size());
|
||||||
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
||||||
|
|
||||||
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState);
|
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.ATTRIBUTES, true, subState, cmd.getScope());
|
||||||
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,7 +234,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
|
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
|
||||||
Map<String, Long> subState = new HashMap<>(data.size());
|
Map<String, Long> subState = new HashMap<>(data.size());
|
||||||
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
||||||
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState);
|
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, true, subState, cmd.getScope());
|
||||||
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,7 +262,7 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
Map<String, Long> subState = new HashMap<>(keys.size());
|
Map<String, Long> subState = new HashMap<>(keys.size());
|
||||||
keys.forEach(key -> subState.put(key, startTs));
|
keys.forEach(key -> subState.put(key, startTs));
|
||||||
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
||||||
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState);
|
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), entityId, SubscriptionType.TIMESERIES, false, subState, cmd.getScope());
|
||||||
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
subscriptionManager.addLocalWsSubscription(ctx, sessionId, entityId, sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -51,6 +51,10 @@ public class Subscription {
|
|||||||
return getSub().getType();
|
return getSub().getType();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getScope() {
|
||||||
|
return getSub().getScope();
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isAllKeys() {
|
public boolean isAllKeys() {
|
||||||
return getSub().isAllKeys();
|
return getSub().isAllKeys();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -33,6 +33,7 @@ public class SubscriptionState {
|
|||||||
@Getter private final SubscriptionType type;
|
@Getter private final SubscriptionType type;
|
||||||
@Getter private final boolean allKeys;
|
@Getter private final boolean allKeys;
|
||||||
@Getter private final Map<String, Long> keyStates;
|
@Getter private final Map<String, Long> keyStates;
|
||||||
|
@Getter private final String scope;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
|
|||||||
@ -27,6 +27,7 @@ message SubscriptionProto {
|
|||||||
string type = 5;
|
string type = 5;
|
||||||
bool allKeys = 6;
|
bool allKeys = 6;
|
||||||
repeated SubscriptionKetStateProto keyStates = 7;
|
repeated SubscriptionKetStateProto keyStates = 7;
|
||||||
|
string scope = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SubscriptionUpdateProto {
|
message SubscriptionUpdateProto {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user