New API for Agg requests

This commit is contained in:
Andrii Shvaika 2022-09-12 15:54:21 +03:00
parent fa9f1b9f69
commit 7cc2943d18
12 changed files with 176 additions and 31 deletions

View File

@ -22,7 +22,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
@ -30,8 +29,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
@ -52,6 +50,9 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.cmd.v2.AggHistoryCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AggKey;
import org.thingsboard.server.service.telemetry.cmd.v2.AggTimeSeriesCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
@ -69,7 +70,6 @@ import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -133,6 +133,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private int maxEntitiesPerAlarmSubscription;
@Value("${server.ws.dynamic_page_link.max_alarm_queries_per_refresh_interval:10}")
private int maxAlarmQueriesPerRefreshInterval;
@Value("${ui.dashboard.max_datapoints_limit:50000}")
private int maxDatapointLimit;
private ExecutorService wsCallBackExecutor;
private boolean tsInSqlDB;
@ -167,7 +169,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
TbEntityDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
if (ctx != null) {
log.debug("[{}][{}] Updating existing subscriptions using: {}", session.getSessionId(), cmd.getCmdId(), cmd);
if (cmd.getLatestCmd() != null || cmd.getTsCmd() != null || cmd.getHistoryCmd() != null) {
if (cmd.hasAnyCmd()) {
ctx.clearEntitySubscriptions();
}
} else {
@ -206,6 +208,18 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
finalCtx.setRefreshTask(task);
}
}
if (cmd.getAggHistoryCmd() != null) {
handleAggHistoryCmd(session, ctx, cmd.getAggHistoryCmd());
} else if (cmd.getAggTsCmd() != null) {
handleAggTsCmd(session, ctx, cmd.getAggTsCmd());
} else if (cmd.hasRegularCmds()) {
handleRegularCommands(session, ctx, cmd);
} else {
checkAndSendInitialData(ctx);
}
}
private void handleRegularCommands(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, EntityDataCmd cmd) {
ListenableFuture<TbEntityDataSubCtx> historyFuture;
if (cmd.getHistoryCmd() != null) {
log.trace("[{}][{}] Going to process history command: {}", session.getSessionId(), cmd.getCmdId(), cmd.getHistoryCmd());
@ -229,10 +243,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
if (cmd.getTsCmd() != null) {
handleTimeSeriesCmd(theCtx, cmd.getTsCmd());
}
} else if (!theCtx.isInitialDataSent()) {
EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription());
theCtx.sendWsMsg(update);
theCtx.setInitialDataSent(true);
} else {
checkAndSendInitialData(theCtx);
}
} catch (RuntimeException e) {
handleWsCmdRuntimeException(theCtx.getSessionId(), e, cmd);
@ -246,6 +258,94 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}, wsCallBackExecutor);
}
private void checkAndSendInitialData(@Nullable TbEntityDataSubCtx theCtx) {
if (!theCtx.isInitialDataSent()) {
EntityDataUpdate update = new EntityDataUpdate(theCtx.getCmdId(), theCtx.getData(), null, theCtx.getMaxEntitiesPerDataSubscription());
theCtx.sendWsMsg(update);
theCtx.setInitialDataSent(true);
}
}
private void handleAggHistoryCmd(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, AggHistoryCmd cmd) {
var keys = cmd.getKeys();
long interval = cmd.getEndTs() - cmd.getStartTs();
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(
key.getKey(), cmd.getStartTs(), cmd.getEndTs(), interval, 1, key.getAgg()
)).distinct().collect(Collectors.toList());
handleAggCmd(session, ctx, cmd.getKeys(), queries, cmd.getStartTs(), cmd.getEndTs(), false, false);
}
private void handleAggTsCmd(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, AggTimeSeriesCmd cmd) {
long endTs = cmd.getStartTs() + cmd.getTimeWindow();
List<ReadTsKvQuery> queries = cmd.getKeys().stream().map(key -> {
if (cmd.isFloating()) {
return new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), endTs, cmd.getTimeWindow(), getLimit(maxDatapointLimit), Aggregation.NONE);
} else {
return new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), endTs, cmd.getTimeWindow(), 1, key.getAgg());
}
}).distinct().collect(Collectors.toList());
handleAggCmd(session, ctx, cmd.getKeys(), queries, cmd.getStartTs(), endTs, cmd.isFloating(), true);
}
private void handleAggCmd(TelemetryWebSocketSessionRef session, TbEntityDataSubCtx ctx, List<AggKey> keys, List<ReadTsKvQuery> queries,
long startTs, long endTs, boolean floating, boolean subscribe) {
Map<EntityData, ListenableFuture<List<ReadTsKvQueryResult>>> fetchResultMap = new HashMap<>();
List<EntityData> entityDataList = ctx.getData().getData();
entityDataList.forEach(entityData -> fetchResultMap.put(entityData,
tsService.findAllByQueries(ctx.getTenantId(), entityData.getEntityId(), queries)));
Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
// Map that holds last ts for each key for each entity.
Map<EntityData, Map<String, Long>> lastTsEntityMap = new HashMap<>();
fetchResultMap.forEach((entityData, future) -> {
try {
Map<String, Long> lastTsMap = new HashMap<>();
lastTsEntityMap.put(entityData, lastTsMap);
List<ReadTsKvQueryResult> queryResults = future.get();
if (queryResults != null) {
for (ReadTsKvQueryResult queryResult : queryResults) {
if (floating) {
entityData.getAggFloating().put(queryResult.getKey(), queryResult.toTsValues());
} else {
entityData.getAggLatest().computeIfAbsent(queryResult.getAgg(), agg -> new HashMap<>()).put(queryResult.getKey(), queryResult.toTsValue());
}
lastTsMap.put(queryResult.getKey(), queryResult.getLastEntryTs());
}
}
// Populate with empty values if no data found.
keys.forEach(key -> {
if (floating) {
entityData.getAggFloating().putIfAbsent(key.getKey(), new TsValue[]{TsValue.EMPTY});
} else {
entityData.getAggLatest().computeIfAbsent(key.getAgg(), agg -> new HashMap<>()).putIfAbsent(key.getKey(), TsValue.EMPTY);
}
});
} catch (InterruptedException | ExecutionException e) {
log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
ctx.sendWsMsg(new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
}
});
ctx.getWsLock().lock();
try {
EntityDataUpdate update;
if (!ctx.isInitialDataSent()) {
update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
ctx.setInitialDataSent(true);
} else {
update = new EntityDataUpdate(ctx.getCmdId(), null, entityDataList, ctx.getMaxEntitiesPerDataSubscription());
}
if (subscribe) {
ctx.createTimeSeriesSubscriptions(lastTsEntityMap, startTs, endTs, true);
}
ctx.sendWsMsg(update);
entityDataList.forEach(ed -> ed.getTimeseries().clear());
} finally {
ctx.getWsLock().unlock();
}
return ctx;
}, wsCallBackExecutor);
}
private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) {
log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e);
wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED);
@ -420,12 +520,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private ListenableFuture<TbEntityDataSubCtx> handleGetTsCmd(TbEntityDataSubCtx ctx, GetTsCmd cmd, boolean subscribe) {
List<String> keys = cmd.getKeys();
List<ReadTsKvQuery> finalTsKvQueryList;
List<ReadTsKvQuery> tsKvQueryList = cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
List<ReadTsKvQuery> tsKvQueryList = keys.stream().map(key -> new BaseReadTsKvQuery(
key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
)).collect(Collectors.toList());
if (cmd.isFetchLatestPreviousPoint()) {
finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
finalTsKvQueryList.addAll(cmd.getKeys().stream().map(key -> new BaseReadTsKvQuery(
finalTsKvQueryList.addAll(keys.stream().map(key -> new BaseReadTsKvQuery(
key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
)).collect(Collectors.toList()));
} else {
@ -451,7 +551,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
}
// Populate with empty values if no data found.
cmd.getKeys().forEach(key -> {
keys.forEach(key -> {
if (!entityData.getTimeseries().containsKey(key)) {
entityData.getTimeseries().put(key, new TsValue[0]);
}
@ -475,7 +575,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
update = new EntityDataUpdate(ctx.getCmdId(), null, entityDataList, ctx.getMaxEntitiesPerDataSubscription());
}
if (subscribe) {
ctx.createTimeseriesSubscriptions(lastTsEntityMap, cmd.getStartTs(), cmd.getEndTs());
ctx.createTimeSeriesSubscriptions(lastTsEntityMap, cmd.getStartTs(), cmd.getEndTs());
}
ctx.sendWsMsg(update);
entityDataList.forEach(ed -> ed.getTimeseries().clear());
@ -546,11 +646,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
ctx.getWsLock().lock();
try {
ctx.createLatestValuesSubscriptions(latestCmd.getKeys());
if (!ctx.isInitialDataSent()) {
EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null, ctx.getMaxEntitiesPerDataSubscription());
ctx.sendWsMsg(update);
ctx.setInitialDataSent(true);
}
checkAndSendInitialData(ctx);
} finally {
ctx.getWsLock().unlock();
}

View File

@ -122,12 +122,16 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
createSubscriptions(keys, true, 0, 0);
}
public void createTimeseriesSubscriptions(Map<EntityData, Map<String, Long>> entityKeyStates, long startTs, long endTs) {
public void createTimeSeriesSubscriptions(Map<EntityData, Map<String, Long>> entityKeyStates, long startTs, long endTs) {
createTimeSeriesSubscriptions(entityKeyStates, startTs, endTs, false);
}
public void createTimeSeriesSubscriptions(Map<EntityData, Map<String, Long>> entityKeyStates, long startTs, long endTs, boolean resultToLatestValues) {
entityKeyStates.forEach((entityData, keyStates) -> {
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
subToEntityIdMap.put(subIdx, entityData.getEntityId());
localSubscriptionService.addSubscription(
createTsSub(entityData, subIdx, false, startTs, endTs, keyStates));
createTsSub(entityData, subIdx, false, startTs, endTs, keyStates, resultToLatestValues));
});
}
@ -200,6 +204,10 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
}
private TbTimeseriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map<String, Long> keyStates) {
return createTsSub(entityData, subIdx, latestValues, startTs, endTs, keyStates, latestValues);
}
private TbTimeseriesSubscription createTsSub(EntityData entityData, int subIdx, boolean latestValues, long startTs, long endTs, Map<String, Long> keyStates, boolean resultToLatestValues) {
log.trace("[{}][{}][{}] Creating time-series subscription for [{}] with keys: {}", serviceId, cmdId, subIdx, entityData.getEntityId(), keyStates);
return TbTimeseriesSubscription.builder()
.serviceId(serviceId)
@ -207,7 +215,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
.subscriptionId(subIdx)
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityData.getEntityId())
.updateConsumer((sessionId, subscriptionUpdate) -> sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, latestValues))
.updateConsumer((sessionId, subscriptionUpdate) -> sendWsMsg(sessionId, subscriptionUpdate, EntityKeyType.TIME_SERIES, resultToLatestValues))
.allKeys(false)
.keyStates(keyStates)
.latestValues(latestValues)

View File

@ -716,7 +716,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
"Cmd id is negative value!");
sendWsMsg(sessionRef, update);
return false;
} else if (cmd.getQuery() == null && cmd.getLatestCmd() == null && cmd.getHistoryCmd() == null && cmd.getTsCmd() == null) {
} else if (cmd.getQuery() == null && !cmd.hasAnyCmd()) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
"Query is empty!");
sendWsMsg(sessionRef, update);

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import org.thingsboard.server.common.data.query.EntityDataQuery;
@ -55,4 +56,22 @@ public class EntityDataCmd extends DataCmd {
this.aggHistoryCmd = aggHistoryCmd;
this.aggTsCmd = aggTsCmd;
}
@JsonIgnore
public boolean hasAnyCmd() {
return historyCmd != null || latestCmd != null || tsCmd != null || aggHistoryCmd != null || aggTsCmd != null;
}
@JsonIgnore
public boolean hasRegularCmds() {
return historyCmd != null || latestCmd != null || tsCmd != null;
}
@JsonIgnore
public boolean hasAggCmds() {
return aggHistoryCmd != null || aggTsCmd != null;
}
}

View File

@ -25,12 +25,13 @@ import java.util.List;
public class ReadTsKvQueryResult {
private final String key;
// Holds the aggregation from the query
private final Aggregation agg;
// Holds the data list;
private final List<TsKvEntry> data;
// Holds the max ts of the records that match aggregation intervals (not the ts of the aggregation window, but the ts of the last record among all the intervals)
private final long lastEntryTs;
public TsValue[] toTsValues() {
if (data != null && !data.isEmpty()) {
List<TsValue> queryValues = new ArrayList<>();
@ -43,4 +44,14 @@ public class ReadTsKvQueryResult {
}
}
public TsValue toTsValue() {
if (data == null || data.isEmpty()) {
return TsValue.EMPTY;
}
if (data.size() > 1) {
throw new RuntimeException("Query Result has multiple data points!");
}
return data.get(0).toTsValue();
}
}

View File

@ -15,16 +15,25 @@
*/
package org.thingsboard.server.common.data.query;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
import java.util.Map;
@Data
@RequiredArgsConstructor
public class EntityData {
private final EntityId entityId;
private final Map<EntityKeyType, Map<String, TsValue>> latest;
private final Map<String, TsValue[]> timeseries;
private final Map<Aggregation, Map<String, TsValue>> aggLatest;
private final Map<String, TsValue[]> aggFloating;
public EntityData(EntityId entityId, Map<EntityKeyType, Map<String, TsValue>> latest, Map<String, TsValue[]> timeseries) {
this(entityId, latest, timeseries, null, null);
}
}

View File

@ -24,6 +24,8 @@ import lombok.RequiredArgsConstructor;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class TsValue {
public static final TsValue EMPTY = new TsValue(0, "");
private final long ts;
private final String value;
private final Long count;

View File

@ -54,8 +54,8 @@ public class EntityDataAdapter {
EntityType entityType = EntityType.valueOf((String) row.get("entity_type"));
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, id);
Map<EntityKeyType, Map<String, TsValue>> latest = new HashMap<>();
Map<String, TsValue[]> timeseries = new HashMap<>();
EntityData entityData = new EntityData(entityId, latest, timeseries);
//Maybe avoid empty hashmaps?
EntityData entityData = new EntityData(entityId, latest, new HashMap<>(), new HashMap<>(), new HashMap<>());
for (EntityKeyMapping mapping : selectionMapping) {
if (!mapping.isIgnore()) {
EntityKey entityKey = mapping.getEntityKey();

View File

@ -149,7 +149,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
List<TsKvEntry> tsKvEntries = DaoUtil.convertDataList(tsKvEntities);
long lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs());
return new ReadTsKvQueryResult(query.getKey(), tsKvEntries, lastTs);
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs);
}
Optional<TsKvEntity> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {

View File

@ -99,7 +99,7 @@ public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeni
if (lastTs.isEmpty()) {
lastTs = data.stream().map(AbstractTsKvEntity::getTs).filter(Objects::nonNull).max(Long::compare);
}
return new ReadTsKvQueryResult(query.getKey(), DaoUtil.convertDataList(data), lastTs.orElse(query.getStartTs()));
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), DaoUtil.convertDataList(data), lastTs.orElse(query.getStartTs()));
}
}, service);
}

View File

@ -181,7 +181,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey));
var tsKvEntries = DaoUtil.convertDataList(timescaleTsKvEntities);
long lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs());
return new ReadTsKvQueryResult(query.getKey(), tsKvEntries, lastTs);
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs);
}
private List<Optional<? extends AbstractTsKvEntity>> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {

View File

@ -285,7 +285,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public ReadTsKvQueryResult apply(@Nullable List<Optional<TsKvEntryAggWrapper>> input) {
if (input == null) {
return new ReadTsKvQueryResult(query.getKey(), Collections.emptyList(), query.getStartTs());
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), Collections.emptyList(), query.getStartTs());
} else {
long maxTs = query.getStartTs();
List<TsKvEntry> data = new ArrayList<>();
@ -296,7 +296,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
data.add(tsKvEntryAggWrapper.getEntry());
}
}
return new ReadTsKvQueryResult(query.getKey(), data, maxTs);
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), data, maxTs);
}
}
@ -333,7 +333,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
if (tsKvEntries != null) {
lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs());
}
return new ReadTsKvQueryResult(query.getKey(), tsKvEntries, lastTs);
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs);
}, MoreExecutors.directExecutor());
}