fixed telemetry update handling for timeseries subscription

This commit is contained in:
dashevchenko 2025-03-25 18:28:30 +02:00
parent 3e350e991b
commit fabf6f3235
5 changed files with 50 additions and 5 deletions

View File

@ -343,11 +343,9 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
s -> { s -> {
TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s; TbTimeSeriesSubscription sub = (TbTimeSeriesSubscription) s;
List<TsKvEntry> updateData = null; List<TsKvEntry> updateData = null;
if (sub.isAllKeys()) { for (TsKvEntry kv : data) {
updateData = data; if (sub.getKeyStates().containsKey((kv.getKey()))) {
} else { if (!sub.isLatestValues() || kv.getTs() > sub.getKeyStates().get(kv.getKey())) {
for (TsKvEntry kv : data) {
if (sub.getKeyStates().containsKey((kv.getKey()))) {
if (updateData == null) { if (updateData == null) {
updateData = new ArrayList<>(); updateData = new ArrayList<>();
} }

View File

@ -123,6 +123,7 @@ public class DefaultWebSocketService implements WebSocketService {
private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!";
private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!";
private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!";
private static final String LATEST_TELEMETRY_SCOPE = "LATEST_TELEMETRY";
private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, WsSessionMetaData> wsSessionsMap = new ConcurrentHashMap<>();
@ -684,6 +685,7 @@ public class DefaultWebSocketService implements WebSocketService {
.queryTs(queryTs) .queryTs(queryTs)
.allKeys(true) .allKeys(true)
.keyStates(subState) .keyStates(subState)
.latestValues(LATEST_TELEMETRY_SCOPE.equals(cmd.getScope()))
.build(); .build();
subLock.lock(); subLock.lock();
@ -739,6 +741,7 @@ public class DefaultWebSocketService implements WebSocketService {
.queryTs(queryTs) .queryTs(queryTs)
.allKeys(false) .allKeys(false)
.keyStates(subState) .keyStates(subState)
.latestValues(LATEST_TELEMETRY_SCOPE.equals(cmd.getScope()))
.build(); .build();
subLock.lock(); subLock.lock();

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.ws.telemetry.sub; package org.thingsboard.server.service.ws.telemetry.sub;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import net.minidev.json.annotate.JsonIgnore;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.service.subscription.SubscriptionErrorCode; import org.thingsboard.server.service.subscription.SubscriptionErrorCode;

View File

@ -31,6 +31,7 @@ import org.thingsboard.server.service.ws.AuthCmd;
import org.thingsboard.server.service.ws.WsCmd; import org.thingsboard.server.service.ws.WsCmd;
import org.thingsboard.server.service.ws.WsCommandsWrapper; import org.thingsboard.server.service.ws.WsCommandsWrapper;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
@ -38,6 +39,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
import java.net.URI; import java.net.URI;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
@ -271,6 +273,18 @@ public class TbTestWebSocketClient extends WebSocketClient {
return sendEntityDataQuery(edq); return sendEntityDataQuery(edq);
} }
public JsonNode sendTimeseriesCmd(EntityId entityId, String scope) {
log.warn("sendTimeseriesCmd entityId: {}, scope: {}", entityId, scope);
TimeseriesSubscriptionCmd cmd = new TimeseriesSubscriptionCmd(0, 0, 0, 10, null);
cmd.setEntityId(entityId.getId().toString());
cmd.setEntityType(entityId.getEntityType().toString());
cmd.setCmdId(1);
cmd.setScope(scope);
send(cmd);
String msg = this.waitForReply();
return JacksonUtil.fromString(msg, JsonNode.class);
}
public void send(WsCmd... cmds) { public void send(WsCmd... cmds) {
WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper(); WsCommandsWrapper cmdsWrapper = new WsCommandsWrapper();
cmdsWrapper.setCmds(List.of(cmds)); cmdsWrapper.setCmds(List.of(cmds));

View File

@ -68,11 +68,13 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -584,6 +586,33 @@ public class WebsocketApiTest extends AbstractControllerTest {
Assert.assertNull(msg); Assert.assertNull(msg);
} }
@Test
public void testTimeseriesSubscriptionCmd() throws Exception {
long now = System.currentTimeMillis() - 100;
long lastTs = now - TimeUnit.MINUTES.toMillis(1);
TsKvEntry dataPoint1 = new BasicTsKvEntry(lastTs, new LongDataEntry("temperature", 42L));
sendTelemetry(device, List.of(dataPoint1));
JsonNode update = getWsClient().sendTimeseriesCmd(device.getId(), "LATEST_TELEMETRY");
JsonNode data = update.get("data");
Assert.assertEquals(1, data.size());
Assert.assertEquals(JacksonUtil.newArrayNode().add(lastTs).add("42"), data.get("temperature").get(0));
//Sending update from the past, while latest value has new timestamp;
TsKvEntry dataPoint4 = new BasicTsKvEntry(now - TimeUnit.MINUTES.toMillis(5), new LongDataEntry("temperature", 45L));
getWsClient().registerWaitForUpdate();
sendTelemetry(device, List.of(dataPoint4));
String msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
Assert.assertNull(msg);
//Sending duplicate update again
getWsClient().registerWaitForUpdate();
sendTelemetry(device, List.of(dataPoint4));
msg = getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(1));
Assert.assertNull(msg);
}
@Test @Test
public void testEntityDataLatestTsWsCmd() throws Exception { public void testEntityDataLatestTsWsCmd() throws Exception {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();