Support of Comparison Queries

This commit is contained in:
Andrii Shvaika 2022-09-14 14:22:43 +03:00
parent 6879021027
commit 9d30b7558f
14 changed files with 131 additions and 37 deletions

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.ComparisonTsValue;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
@ -74,6 +75,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -268,28 +270,36 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
private ListenableFuture<TbEntityDataSubCtx> handleAggHistoryCmd(TbEntityDataSubCtx ctx, AggHistoryCmd cmd) {
var keys = cmd.getKeys();
long interval = cmd.getEndTs() - cmd.getStartTs();
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(
key.getKey(), cmd.getStartTs(), cmd.getEndTs(), interval, 1, key.getAgg()
)).distinct().collect(Collectors.toList());
ConcurrentMap<Integer, ReadTsKvQueryInfo> queries = new ConcurrentHashMap<>();
for (AggKey key : cmd.getKeys()) {
if (key.getPreviousValueOnly() == null || !key.getPreviousValueOnly()) {
var query = new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), cmd.getEndTs(), cmd.getEndTs() - cmd.getStartTs(), 1, key.getAgg());
queries.put(query.getId(), new ReadTsKvQueryInfo(key, query, false));
}
if (key.getPreviousStartTs() != null && key.getPreviousEndTs() != null && key.getPreviousEndTs() >= key.getPreviousStartTs()) {
var query = new BaseReadTsKvQuery(key.getKey(), key.getPreviousStartTs(), key.getPreviousEndTs(), key.getPreviousEndTs() - key.getPreviousStartTs(), 1, key.getAgg());
queries.put(query.getId(), new ReadTsKvQueryInfo(key, query, true));
}
}
return handleAggCmd(ctx, cmd.getKeys(), queries, cmd.getStartTs(), cmd.getEndTs(), false);
}
private ListenableFuture<TbEntityDataSubCtx> handleAggTsCmd(TbEntityDataSubCtx ctx, AggTimeSeriesCmd cmd) {
long endTs = cmd.getStartTs() + cmd.getTimeWindow();
List<ReadTsKvQuery> queries = cmd.getKeys().stream()
.map(key -> new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), endTs, cmd.getTimeWindow(), 1, key.getAgg()))
.distinct().collect(Collectors.toList());
return handleAggCmd(ctx, cmd.getKeys(), queries, cmd.getStartTs(), endTs, true);
ConcurrentMap<Integer, ReadTsKvQueryInfo> queries = new ConcurrentHashMap<>();
for (AggKey key : cmd.getKeys()) {
var query = new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), cmd.getStartTs() + cmd.getTimeWindow(), cmd.getTimeWindow(), 1, key.getAgg());
queries.put(query.getId(), new ReadTsKvQueryInfo(key, query, false));
}
return handleAggCmd(ctx, cmd.getKeys(), queries, cmd.getStartTs(), cmd.getStartTs() + cmd.getTimeWindow(), true);
}
private ListenableFuture<TbEntityDataSubCtx> handleAggCmd(TbEntityDataSubCtx ctx, List<AggKey> keys, List<ReadTsKvQuery> queries,
private ListenableFuture<TbEntityDataSubCtx> handleAggCmd(TbEntityDataSubCtx ctx, List<AggKey> keys, ConcurrentMap<Integer, ReadTsKvQueryInfo> queries,
long startTs, long endTs, boolean subscribe) {
Map<EntityData, ListenableFuture<List<ReadTsKvQueryResult>>> fetchResultMap = new HashMap<>();
List<EntityData> entityDataList = ctx.getData().getData();
List<ReadTsKvQuery> queryList = queries.values().stream().map(ReadTsKvQueryInfo::getQuery).collect(Collectors.toList());
entityDataList.forEach(entityData -> fetchResultMap.put(entityData,
tsService.findAllByQueries(ctx.getTenantId(), entityData.getEntityId(), queries)));
tsService.findAllByQueries(ctx.getTenantId(), entityData.getEntityId(), queryList)));
return Futures.transform(Futures.allAsList(fetchResultMap.values()), f -> {
// Map that holds last ts for each key for each entity.
Map<EntityData, Map<String, Long>> lastTsEntityMap = new HashMap<>();
@ -301,13 +311,19 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
List<ReadTsKvQueryResult> queryResults = future.get();
if (queryResults != null) {
for (ReadTsKvQueryResult queryResult : queryResults) {
entityData.getAggLatest().computeIfAbsent(queryResult.getAgg(), agg -> new HashMap<>()).put(queryResult.getKey(), queryResult.toTsValue());
lastTsMap.put(queryResult.getKey(), queryResult.getLastEntryTs());
ReadTsKvQueryInfo queryInfo = queries.get(queryResult.getQueryId());
ComparisonTsValue comparisonTsValue = entityData.getAggLatest().computeIfAbsent(queryInfo.getKey().getId(), agg -> new ComparisonTsValue());
if (queryInfo.isPrevious()) {
comparisonTsValue.setPrevious(queryResult.toTsValue());
} else {
comparisonTsValue.setCurrent(queryResult.toTsValue());
lastTsMap.put(queryInfo.getQuery().getKey(), queryResult.getLastEntryTs());
}
}
}
// Populate with empty values if no data found.
keys.forEach(key -> {
entityData.getAggLatest().computeIfAbsent(key.getAgg(), agg -> new HashMap<>()).putIfAbsent(key.getKey(), TsValue.EMPTY);
entityData.getAggLatest().putIfAbsent(key.getId(), new ComparisonTsValue(TsValue.EMPTY, TsValue.EMPTY));
});
} catch (InterruptedException | ExecutionException e) {
log.warn("[{}][{}][{}] Failed to fetch historical data", ctx.getSessionId(), ctx.getCmdId(), entityData.getEntityId(), e);
@ -507,16 +523,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
private ListenableFuture<TbEntityDataSubCtx> handleGetTsCmd(TbEntityDataSubCtx ctx, GetTsCmd cmd, boolean subscribe) {
Map<Integer, String> queriesKeys = new ConcurrentHashMap<>();
List<String> keys = cmd.getKeys();
List<ReadTsKvQuery> finalTsKvQueryList;
List<ReadTsKvQuery> tsKvQueryList = keys.stream().map(key -> new BaseReadTsKvQuery(
key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
)).collect(Collectors.toList());
List<ReadTsKvQuery> tsKvQueryList = keys.stream().map(key -> {
var query = new BaseReadTsKvQuery(
key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), cmd.getAgg()
);
queriesKeys.put(query.getId(), query.getKey());
return query;
}).collect(Collectors.toList());
if (cmd.isFetchLatestPreviousPoint()) {
finalTsKvQueryList = new ArrayList<>(tsKvQueryList);
finalTsKvQueryList.addAll(keys.stream().map(key -> new BaseReadTsKvQuery(
key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg()
)).collect(Collectors.toList()));
finalTsKvQueryList.addAll(keys.stream().map(key -> {
var query = new BaseReadTsKvQuery(
key, cmd.getStartTs() - TimeUnit.DAYS.toMillis(365), cmd.getStartTs(), cmd.getInterval(), 1, cmd.getAgg());
queriesKeys.put(query.getId(), query.getKey());
return query;
}
).collect(Collectors.toList()));
} else {
finalTsKvQueryList = tsKvQueryList;
}
@ -535,8 +561,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
List<ReadTsKvQueryResult> queryResults = future.get();
if (queryResults != null) {
for (ReadTsKvQueryResult queryResult : queryResults) {
entityData.getTimeseries().put(queryResult.getKey(), queryResult.toTsValues());
lastTsMap.put(queryResult.getKey(), queryResult.getLastEntryTs());
String queryKey = queriesKeys.get(queryResult.getQueryId());
entityData.getTimeseries().put(queryKey, queryResult.toTsValues());
lastTsMap.put(queryKey, queryResult.getLastEntryTs());
}
}
// Populate with empty values if no data found.

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import lombok.Data;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.service.telemetry.cmd.v2.AggKey;
@Data
public class ReadTsKvQueryInfo {
private final AggKey key;
private final ReadTsKvQuery query;
private final boolean previous;
}

View File

@ -21,6 +21,12 @@ import org.thingsboard.server.common.data.kv.Aggregation;
@Data
public class AggKey {
private int id;
private String key;
private Aggregation agg;
private Long previousStartTs;
private Long previousEndTs;
private Boolean previousValueOnly;
}

View File

@ -31,8 +31,7 @@ public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery {
this(key, startTs, endTs, interval, limit, aggregation, "DESC");
}
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation,
String order) {
public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String order) {
super(key, startTs, endTs);
this.interval = interval;
this.limit = limit;

View File

@ -20,11 +20,16 @@ import lombok.Data;
@Data
public class BaseTsKvQuery implements TsKvQuery {
private static final ThreadLocal<Integer> idSeq = ThreadLocal.withInitial(() -> 0);
private final int id;
private final String key;
private final long startTs;
private final long endTs;
public BaseTsKvQuery(String key, long startTs, long endTs) {
this.id = idSeq.get();
idSeq.set(id + 1);
this.key = key;
this.startTs = startTs;
this.endTs = endTs;

View File

@ -24,9 +24,7 @@ import java.util.List;
@Data
public class ReadTsKvQueryResult {
private final String key;
// Holds the aggregation from the query
private final Aggregation agg;
private final int queryId;
// Holds the data list;
private final List<TsKvEntry> data;
// Holds the max ts of the records that match aggregation intervals (not the ts of the aggregation window, but the ts of the last record among all the intervals)

View File

@ -17,6 +17,8 @@ package org.thingsboard.server.common.data.kv;
public interface TsKvQuery {
int getId();
String getKey();
long getStartTs();

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.query;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ComparisonTsValue {
private TsValue current;
private TsValue previous;
}

View File

@ -15,11 +15,9 @@
*/
package org.thingsboard.server.common.data.query;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
import java.util.Map;
@ -30,7 +28,7 @@ public class EntityData {
private final EntityId entityId;
private final Map<EntityKeyType, Map<String, TsValue>> latest;
private final Map<String, TsValue[]> timeseries;
private final Map<Aggregation, Map<String, TsValue>> aggLatest;
private final Map<Integer, ComparisonTsValue> aggLatest;
public EntityData(EntityId entityId, Map<EntityKeyType, Map<String, TsValue>> latest, Map<String, TsValue[]> timeseries) {
this(entityId, latest, timeseries, null);

View File

@ -149,7 +149,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
tsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(query.getKey()));
List<TsKvEntry> tsKvEntries = DaoUtil.convertDataList(tsKvEntities);
long lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs());
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs);
return new ReadTsKvQueryResult(query.getId(), tsKvEntries, lastTs);
}
Optional<TsKvEntity> findAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long ts, Aggregation aggregation) {

View File

@ -99,7 +99,7 @@ public abstract class BaseAbstractSqlTimeseriesDao extends JpaAbstractDaoListeni
if (lastTs.isEmpty()) {
lastTs = data.stream().map(AbstractTsKvEntity::getTs).filter(Objects::nonNull).max(Long::compare);
}
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), DaoUtil.convertDataList(data), lastTs.orElse(query.getStartTs()));
return new ReadTsKvQueryResult(query.getId(), DaoUtil.convertDataList(data), lastTs.orElse(query.getStartTs()));
}
}, service);
}

View File

@ -181,7 +181,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
timescaleTsKvEntities.forEach(tsKvEntity -> tsKvEntity.setStrKey(strKey));
var tsKvEntries = DaoUtil.convertDataList(timescaleTsKvEntities);
long lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs());
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs);
return new ReadTsKvQueryResult(query.getId(), tsKvEntries, lastTs);
}
private List<Optional<? extends AbstractTsKvEntity>> findAllAndAggregateAsync(EntityId entityId, String key, long startTs, long endTs, long timeBucket, Aggregation aggregation) {

View File

@ -46,6 +46,7 @@ import org.thingsboard.server.dao.service.Validator;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.StringUtils.isBlank;

View File

@ -285,7 +285,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
@Override
public ReadTsKvQueryResult apply(@Nullable List<Optional<TsKvEntryAggWrapper>> input) {
if (input == null) {
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), Collections.emptyList(), query.getStartTs());
return new ReadTsKvQueryResult(query.getId(), Collections.emptyList(), query.getStartTs());
} else {
long maxTs = query.getStartTs();
List<TsKvEntry> data = new ArrayList<>();
@ -296,7 +296,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
data.add(tsKvEntryAggWrapper.getEntry());
}
}
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), data, maxTs);
return new ReadTsKvQueryResult(query.getId(), data, maxTs);
}
}
@ -333,7 +333,7 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
if (tsKvEntries != null) {
lastTs = tsKvEntries.stream().map(TsKvEntry::getTs).max(Long::compare).orElse(query.getStartTs());
}
return new ReadTsKvQueryResult(query.getKey(), query.getAggregation(), tsKvEntries, lastTs);
return new ReadTsKvQueryResult(query.getId(), tsKvEntries, lastTs);
}, MoreExecutors.directExecutor());
}