Merge pull request #13025 from dashevchenko/timeseriesSubFix
[Telemetry] WS Subscription: Older telemetry overwrites newer data on UI
This commit is contained in:
commit
d0707d5c03
@ -343,15 +343,31 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
s -> {
|
||||
TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s;
|
||||
List<TsKvEntry> updateData = null;
|
||||
Map<String, Long> keyStates = sub.getKeyStates();
|
||||
if (sub.isAllKeys()) {
|
||||
updateData = data;
|
||||
if (sub.isLatestValues()) {
|
||||
for (TsKvEntry kv : data) {
|
||||
Long stateTs = keyStates.get(kv.getKey());
|
||||
if (stateTs == null || kv.getTs() > stateTs) {
|
||||
if (updateData == null) {
|
||||
updateData = new ArrayList<>();
|
||||
}
|
||||
updateData.add(kv);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
updateData = data;
|
||||
}
|
||||
} else {
|
||||
for (TsKvEntry kv : data) {
|
||||
if (sub.getKeyStates().containsKey((kv.getKey()))) {
|
||||
if (updateData == null) {
|
||||
updateData = new ArrayList<>();
|
||||
Long stateTs = keyStates.get(kv.getKey());
|
||||
if (stateTs != null) {
|
||||
if (!sub.isLatestValues() || kv.getTs() > stateTs) {
|
||||
if (updateData == null) {
|
||||
updateData = new ArrayList<>();
|
||||
}
|
||||
updateData.add(kv);
|
||||
}
|
||||
updateData.add(kv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,6 +105,8 @@ import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.LATEST_TELEMETRY_SCOPE;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 27.03.18.
|
||||
*/
|
||||
@ -667,24 +669,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
||||
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
||||
|
||||
Lock subLock = new ReentrantLock();
|
||||
TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder()
|
||||
.serviceId(serviceId)
|
||||
.sessionId(sessionId)
|
||||
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
|
||||
.tenantId(sessionRef.getSecurityCtx().getTenantId())
|
||||
.entityId(entityId)
|
||||
.updateProcessor((subscription, update) -> {
|
||||
subLock.lock();
|
||||
try {
|
||||
sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
|
||||
} finally {
|
||||
subLock.unlock();
|
||||
}
|
||||
})
|
||||
.queryTs(queryTs)
|
||||
.allKeys(true)
|
||||
.keyStates(subState)
|
||||
.build();
|
||||
TbTimeSeriesSubscription sub = getTsSubscription(subState, subLock, sessionId, sessionRef, cmd, entityId, queryTs, true);
|
||||
|
||||
subLock.lock();
|
||||
try {
|
||||
@ -712,6 +697,28 @@ public class DefaultWebSocketService implements WebSocketService {
|
||||
on(r -> Futures.addCallback(tsService.findAllLatest(sessionRef.getSecurityCtx().getTenantId(), entityId), callback, executor), callback::onFailure));
|
||||
}
|
||||
|
||||
private TbTimeSeriesSubscription getTsSubscription(Map<String, Long> subState, Lock subLock, String sessionId, WebSocketSessionRef sessionRef, TimeseriesSubscriptionCmd cmd, EntityId entityId, long queryTs, boolean allKeys) {
|
||||
return TbTimeSeriesSubscription.builder()
|
||||
.serviceId(serviceId)
|
||||
.sessionId(sessionId)
|
||||
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
|
||||
.tenantId(sessionRef.getSecurityCtx().getTenantId())
|
||||
.entityId(entityId)
|
||||
.updateProcessor((subscription, update) -> {
|
||||
subLock.lock();
|
||||
try {
|
||||
sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
|
||||
} finally {
|
||||
subLock.unlock();
|
||||
}
|
||||
})
|
||||
.queryTs(queryTs)
|
||||
.allKeys(allKeys)
|
||||
.keyStates(subState)
|
||||
.latestValues(LATEST_TELEMETRY_SCOPE.equals(cmd.getScope()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private FutureCallback<List<TsKvEntry>> getSubscriptionCallback(final WebSocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd,
|
||||
final String sessionId, final EntityId entityId, final long queryTs, final long startTs, final List<String> keys) {
|
||||
return new FutureCallback<>() {
|
||||
@ -722,24 +729,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
||||
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
|
||||
|
||||
Lock subLock = new ReentrantLock();
|
||||
TbTimeSeriesSubscription sub = TbTimeSeriesSubscription.builder()
|
||||
.serviceId(serviceId)
|
||||
.sessionId(sessionId)
|
||||
.subscriptionId(registerNewSessionSubId(sessionId, sessionRef, cmd.getCmdId()))
|
||||
.tenantId(sessionRef.getSecurityCtx().getTenantId())
|
||||
.entityId(entityId)
|
||||
.updateProcessor((subscription, update) -> {
|
||||
subLock.lock();
|
||||
try {
|
||||
sendUpdate(subscription.getSessionId(), cmd.getCmdId(), update);
|
||||
} finally {
|
||||
subLock.unlock();
|
||||
}
|
||||
})
|
||||
.queryTs(queryTs)
|
||||
.allKeys(false)
|
||||
.keyStates(subState)
|
||||
.build();
|
||||
TbTimeSeriesSubscription sub = getTsSubscription(subState, subLock, sessionId, sessionRef, cmd, entityId, queryTs, false);
|
||||
|
||||
subLock.lock();
|
||||
try {
|
||||
|
||||
@ -31,6 +31,7 @@ import org.thingsboard.server.service.ws.AuthCmd;
|
||||
import org.thingsboard.server.service.ws.WsCmd;
|
||||
import org.thingsboard.server.service.ws.WsCommandsWrapper;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
|
||||
@ -38,6 +39,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityHistoryCmd;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
|
||||
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.channels.NotYetConnectedException;
|
||||
@ -271,6 +273,18 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
||||
return sendEntityDataQuery(edq);
|
||||
}
|
||||
|
||||
public JsonNode sendTimeseriesCmd(EntityId entityId, String scope) {
|
||||
log.warn("sendTimeseriesCmd entityId: {}, scope: {}", entityId, scope);
|
||||
TimeseriesSubscriptionCmd cmd = new TimeseriesSubscriptionCmd(0, 0, 0, 10, null);
|
||||
cmd.setEntityId(entityId.getId().toString());
|
||||
cmd.setEntityType(entityId.getEntityType().toString());
|
||||
cmd.setCmdId(1);
|
||||
cmd.setScope(scope);
|
||||
send(cmd);
|
||||
String msg = this.waitForReply();
|
||||
return JacksonUtil.fromString(msg, JsonNode.class);
|
||||
}
|
||||
|
||||
public void send(WsCmd... cmds) {
|
||||
WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
|
||||
cmdsWrapper.setCmds(List.of(cmds));
|
||||
|
||||
@ -68,11 +68,13 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -584,6 +586,33 @@ public class WebsocketApiTest extends AbstractControllerTest {
|
||||
Assert.assertNull(msg);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesSubscriptionCmd() throws Exception {
|
||||
long now = System.currentTimeMillis() - 100;
|
||||
|
||||
long lastTs = now - TimeUnit.MINUTES.toMillis(1);
|
||||
TsKvEntry dataPoint1 = new BasicTsKvEntry(lastTs, new LongDataEntry("temperature", 42L));
|
||||
sendTelemetry(device, List.of(dataPoint1));
|
||||
|
||||
JsonNode update = getWsClient().sendTimeseriesCmd(device.getId(), "LATEST_TELEMETRY");
|
||||
JsonNode data = update.get("data");
|
||||
Assert.assertEquals(1, data.size());
|
||||
Assert.assertEquals(JacksonUtil.newArrayNode().add(lastTs).add("42"), data.get("temperature").get(0));
|
||||
|
||||
//Sending update from the past, while latest value has new timestamp;
|
||||
TsKvEntry dataPoint4 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(5), new LongDataEntry("temperature", 45L));
|
||||
getWsClient().registerWaitForUpdate();
|
||||
sendTelemetry(device, List.of(dataPoint4));
|
||||
String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
|
||||
Assert.assertNull(msg);
|
||||
|
||||
//Sending duplicate update again
|
||||
getWsClient().registerWaitForUpdate();
|
||||
sendTelemetry(device, List.of(dataPoint4));
|
||||
msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
|
||||
Assert.assertNull(msg);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEntityDataLatestTsWsCmd() throws Exception {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
@ -148,4 +148,6 @@ public class DataConstants {
|
||||
public static final String CF_QUEUE_NAME = "CalculatedFields";
|
||||
public static final String CF_STATES_QUEUE_NAME = "CalculatedFieldStates";
|
||||
|
||||
public static final String LATEST_TELEMETRY_SCOPE = "LATEST_TELEMETRY";
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user