Merge branch 'feature/entity-data-query' of github.com:thingsboard/thingsboard into feature/entity-data-query

This commit is contained in:
Igor Kulikov 2020-06-24 11:32:04 +03:00
commit 32fcfc7a7f
5 changed files with 98 additions and 57 deletions

View File

@ -171,11 +171,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
if (ctx != null) { if (ctx != null) {
log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd); log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null) { if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) {
Collection<Integer> oldSubIds = ctx.clearSubscriptions(); Collection<Integer> oldSubIds = ctx.clearSubscriptions();
oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId)); oldSubIds.forEach(subId -> localSubscriptionService.cancelSubscription(serviceId, subId));
} }
//TODO: cleanup old subscription;
} else { } else {
log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd); log.debug("[{}][{}] Creating new subscription using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
ctx = createSubCtx(session, cmd); ctx = createSubCtx(session, cmd);

View File

@ -37,10 +37,11 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer; import java.util.Optional;
@Slf4j @Slf4j
@Data @Data
@ -145,12 +146,7 @@ public class TbEntityDataSubCtx {
.subscriptionId(subIdx) .subscriptionId(subIdx)
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityData.getEntityId()) .entityId(entityData.getEntityId())
.updateConsumer(new BiConsumer<String, SubscriptionUpdate>() { .updateConsumer((sessionId, subscriptionUpdate) -> sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues))
@Override
public void accept(String sessionId, SubscriptionUpdate subscriptionUpdate) {
sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues);
}
})
.allKeys(false) .allKeys(false)
.keyStates(keyStates) .keyStates(keyStates)
.build(); .build();
@ -179,6 +175,17 @@ public class TbEntityDataSubCtx {
EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId()); EntityId entityId = subToEntityIdMap.get(subscriptionUpdate.getSubscriptionId());
if (entityId != null) { if (entityId != null) {
log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); log.trace("[{}][{}][{}][{}] Received subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
if (resultToLatestValues) {
sendLatestWsMsg(entityId, sessionId, subscriptionUpdate, keyType);
} else {
sendTsWsMsg(entityId, sessionId, subscriptionUpdate, keyType);
}
} else {
log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
}
}
private void sendLatestWsMsg(EntityId entityId, String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
Map<String, TsValue> latestUpdate = new HashMap<>(); Map<String, TsValue> latestUpdate = new HashMap<>();
subscriptionUpdate.getData().forEach((k, v) -> { subscriptionUpdate.getData().forEach((k, v) -> {
Object[] data = (Object[]) v.get(0); Object[] data = (Object[]) v.get(0);
@ -206,20 +213,52 @@ public class TbEntityDataSubCtx {
} }
} }
if (!latestUpdate.isEmpty()) { if (!latestUpdate.isEmpty()) {
if (resultToLatestValues) {
Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate); Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
entityData = new EntityData(entityId, latestMap, null); entityData = new EntityData(entityId, latestMap, null);
} else {
Map<String, TsValue[]> tsMap = new HashMap<>();
latestUpdate.forEach((key, tsValue) -> {
tsMap.put(key, new TsValue[]{tsValue});
});
entityData = new EntityData(entityId, null, tsMap);
}
wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
} }
} else { }
log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate);
private void sendTsWsMsg(EntityId entityId, String sessionId, SubscriptionUpdate subscriptionUpdate, EntityKeyType keyType) {
Map<String, List<TsValue>> tsUpdate = new HashMap<>();
subscriptionUpdate.getData().forEach((k, v) -> {
Object[] data = (Object[]) v.get(0);
tsUpdate.computeIfAbsent(k, key -> new ArrayList<>()).add(new TsValue((Long) data[0], (String) data[1]));
});
EntityData entityData = getDataForEntity(entityId);
if (entityData != null && entityData.getLatest() != null) {
Map<String, TsValue> latestCtxValues = entityData.getLatest().get(keyType);
log.trace("[{}][{}][{}] Going to compare update with {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), latestCtxValues);
if (latestCtxValues != null) {
latestCtxValues.forEach((k, v) -> {
List<TsValue> updateList = tsUpdate.get(k);
if (updateList != null) {
for (TsValue update : new ArrayList<>(updateList)) {
if (update.getTs() < v.getTs()) {
log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
updateList.remove(update);
} else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
updateList.remove(update);
}
if (updateList.isEmpty()) {
tsUpdate.remove(k);
}
}
}
});
//Setting new values
tsUpdate.forEach((k, v) -> {
Optional<TsValue> maxValue = v.stream().max(Comparator.comparingLong(TsValue::getTs));
maxValue.ifPresent(max -> latestCtxValues.put(k, max));
});
}
}
if (!tsUpdate.isEmpty()) {
Map<String, TsValue[]> tsMap = new HashMap<>();
tsUpdate.forEach((key, tsValue) -> tsMap.put(key, tsValue.toArray(new TsValue[tsValue.size()])));
entityData = new EntityData(entityId, null, tsMap);
wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
} }
} }

View File

@ -21,6 +21,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
@ -113,7 +114,9 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
DeviceTypeFilter dtf = new DeviceTypeFilter(); DeviceTypeFilter dtf = new DeviceTypeFilter();
dtf.setDeviceNameFilter("D"); dtf.setDeviceNameFilter("D");
dtf.setDeviceType("default"); dtf.setDeviceType("default");
EntityDataQuery edq = new EntityDataQuery(dtf, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); EntityDataQuery edq = new EntityDataQuery(dtf,
new EntityDataPageLink(1, 0, null, null),
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
EntityHistoryCmd historyCmd = new EntityHistoryCmd(); EntityHistoryCmd historyCmd = new EntityHistoryCmd();
historyCmd.setKeys(Arrays.asList("temperature")); historyCmd.setKeys(Arrays.asList("temperature"));
@ -148,11 +151,11 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
msg = wsClient.waitForReply(); msg = wsClient.waitForReply();
update = mapper.readValue(msg, EntityDataUpdate.class); update = mapper.readValue(msg, EntityDataUpdate.class);
Assert.assertEquals(1, update.getCmdId()); Assert.assertEquals(1, update.getCmdId());
pageData = update.getData(); List<EntityData> dataList = update.getUpdate();
Assert.assertNotNull(pageData); Assert.assertNotNull(dataList);
Assert.assertEquals(1, pageData.getData().size()); Assert.assertEquals(1, dataList.size());
Assert.assertEquals(device.getId(), pageData.getData().get(0).getEntityId()); Assert.assertEquals(device.getId(), dataList.get(0).getEntityId());
TsValue[] tsArray = pageData.getData().get(0).getTimeseries().get("temperature"); TsValue[] tsArray = dataList.get(0).getTimeseries().get("temperature");
Assert.assertEquals(3, tsArray.length); Assert.assertEquals(3, tsArray.length);
Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsArray[0]); Assert.assertEquals(new TsValue(dataPoint1.getTs(), dataPoint1.getValueAsString()), tsArray[0]);
Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsArray[1]); Assert.assertEquals(new TsValue(dataPoint2.getTs(), dataPoint2.getValueAsString()), tsArray[1]);
@ -223,8 +226,8 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
now = System.currentTimeMillis(); now = System.currentTimeMillis();
TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L)); TsKvEntry dataPoint4 = new BasicTsKvEntry(now, new LongDataEntry("temperature", 45L));
wsClient.registerWaitForUpdate(); wsClient.registerWaitForUpdate();
Thread.sleep(100);
sendTelemetry(device, Arrays.asList(dataPoint4)); sendTelemetry(device, Arrays.asList(dataPoint4));
msg = wsClient.waitForUpdate(); msg = wsClient.waitForUpdate();

View File

@ -26,9 +26,9 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class) @RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({ @ClasspathSuite.ClassnameFilters({
"org.thingsboard.server.controller.sql.WebsocketApiSqlTest", // "org.thingsboard.server.controller.sql.WebsocketApiSqlTest",
// "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest", // "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest",
// "org.thingsboard.server.controller.sql.*Test", "org.thingsboard.server.controller.sql.*Test",
}) })
public class ControllerSqlTestSuite { public class ControllerSqlTestSuite {

View File

@ -7,7 +7,7 @@
</encoder> </encoder>
</appender> </appender>
<logger name="org.thingsboard.server.service.subscription" level="TRACE"/> <!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
<logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/> <logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
<logger name="org.thingsboard.server" level="WARN"/> <logger name="org.thingsboard.server" level="WARN"/>
<logger name="org.springframework" level="WARN"/> <logger name="org.springframework" level="WARN"/>