fixed telemetry update handling for all keys
This commit is contained in:
parent
fabf6f3235
commit
3f13fbd23f
@ -343,6 +343,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
s -> {
|
s -> {
|
||||||
TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s;
|
TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s;
|
||||||
List<TsKvEntry> updateData = null;
|
List<TsKvEntry> updateData = null;
|
||||||
|
if (sub.isAllKeys()) {
|
||||||
|
if (sub.isLatestValues()) {
|
||||||
|
for (TsKvEntry kv : data) {
|
||||||
|
if (!sub.getKeyStates().containsKey((kv.getKey())) || kv.getTs() > sub.getKeyStates().get(kv.getKey())) {
|
||||||
|
if (updateData == null) {
|
||||||
|
updateData = new ArrayList<>();
|
||||||
|
}
|
||||||
|
updateData.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
updateData = data;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
for (TsKvEntry kv : data) {
|
for (TsKvEntry kv : data) {
|
||||||
if (sub.getKeyStates().containsKey((kv.getKey()))) {
|
if (sub.getKeyStates().containsKey((kv.getKey()))) {
|
||||||
if (!sub.isLatestValues() || kv.getTs() > sub.getKeyStates().get(kv.getKey())) {
|
if (!sub.isLatestValues() || kv.getTs() > sub.getKeyStates().get(kv.getKey())) {
|
||||||
@ -353,6 +367,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (updateData != null) {
|
if (updateData != null) {
|
||||||
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), updateData);
|
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(sub.getSubscriptionId(), updateData);
|
||||||
update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put(key, value));
|
update.getLatestValues().forEach((key, value) -> sub.getKeyStates().put(key, value));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user