diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index fbd0cce408..9474a621e1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -151,20 +151,9 @@ public final class PluginProcessingContext implements PluginContext { } @Override - public List loadTimeseries(DeviceId deviceId, TsKvQuery query) { + public void loadTimeseries(DeviceId deviceId, List queries, PluginCallback> callback) { validate(deviceId); - try { - return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get(); - } catch (Exception e) { - log.error("TODO", e); - throw new RuntimeException(e); - } - } - - @Override - public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback> callback) { - validate(deviceId); - ListenableFuture> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query); + ListenableFuture> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries); Futures.addCallback(future, getCallback(callback, v -> v), executor); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java index 5cf71fcce8..81bbdf9309 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesDao.java @@ -96,7 +96,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao { } @Override - public ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query) { + public ListenableFuture> findAllAsync(String entityType, UUID entityId, List queries) { + List>> futures = queries.stream().map(query -> findAllAsync(entityType, entityId, query)).collect(Collectors.toList()); + return Futures.transform(Futures.allAsList(futures), new Function>, List>() { + @Nullable + @Override + public List apply(@Nullable List> results) { + List result = new ArrayList(); + results.forEach(r -> result.addAll(r)); + return result; + } + }, readResultsProcessingExecutor); + } + + + private ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { return findAllAsyncWithLimit(entityType, entityId, query); } else { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 1d8c3dfd4e..f27ed6e605 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; +import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -32,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.service.Validator; +import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.Instant; @@ -40,6 +42,7 @@ import java.time.ZoneOffset; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.isBlank; @@ -56,10 +59,10 @@ public class BaseTimeseriesService implements TimeseriesService { private TimeseriesDao timeseriesDao; @Override - public ListenableFuture> findAll(String entityType, UUIDBased entityId, TsKvQuery query) { + public ListenableFuture> findAll(String entityType, UUIDBased entityId, List queries) { validate(entityType, entityId); - validate(query); - return timeseriesDao.findAllAsync(entityType, entityId.getId(), query); + queries.forEach(query -> validate(query)); + return timeseriesDao.findAllAsync(entityType, entityId.getId(), queries); } @Override @@ -132,7 +135,7 @@ public class BaseTimeseriesService implements TimeseriesService { throw new IncorrectParameterException("TsKvQuery can't be null"); } else if (isBlank(query.getKey())) { throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty"); - } else if (query.getAggregation() == null){ + } else if (query.getAggregation() == null) { throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty"); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 7a6eed7eb0..177003ddf2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -33,9 +33,7 @@ public interface TimeseriesDao { long toPartitionTs(long ts); - ListenableFuture> findAllAsync(String entityType, UUID entityId, TsKvQuery query); - -// List find(String entityType, UUID entityId, TsKvQuery query, Optional minPartition, Optional maxPartition); + ListenableFuture> findAllAsync(String entityType, UUID entityId, List queries); ResultSetFuture findLatest(String entityType, UUID entityId, String key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 1bafdea37f..cd53e9468f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -33,7 +33,7 @@ import java.util.Set; */ public interface TimeseriesService { - ListenableFuture> findAll(String entityType, UUIDBased entityId, TsKvQuery query); + ListenableFuture> findAll(String entityType, UUIDBased entityId, List queries); ListenableFuture> findLatest(String entityType, UUIDBased entityId, Collection keys); diff --git a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java index b5f27a1e5a..c2c5587704 100644 --- a/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java +++ b/extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/PluginContext.java @@ -82,9 +82,7 @@ public interface PluginContext { void saveTsData(DeviceId deviceId, List entry, PluginCallback callback); - List loadTimeseries(DeviceId deviceId, TsKvQuery query); - - void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback> callback); + void loadTimeseries(DeviceId deviceId, List queries, PluginCallback> callback); void loadLatestTimeseries(DeviceId deviceId, Collection keys, PluginCallback> callback); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java index 14d7aa1f86..9f068950ee 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/GetHistoryCmd.java @@ -15,9 +15,16 @@ */ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + /** * @author Andrew Shvayka */ +@NoArgsConstructor +@AllArgsConstructor +@Data public class GetHistoryCmd implements TelemetryPluginCmd { private int cmdId; @@ -25,46 +32,7 @@ public class GetHistoryCmd implements TelemetryPluginCmd { private String keys; private long startTs; private long endTs; + private int limit; + private String agg; - @Override - public int getCmdId() { - return cmdId; - } - - @Override - public void setCmdId(int cmdId) { - this.cmdId = cmdId; - } - - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String getKeys() { - return keys; - } - - public void setKeys(String keys) { - this.keys = keys; - } - - public long getStartTs() { - return startTs; - } - - public void setStartTs(long startTs) { - this.startTs = startTs; - } - - public long getEndTs() { - return endTs; - } - - public void setEndTs(long endTs) { - this.endTs = endTs; - } } diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java index 718f23a20d..3574eae0ec 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/SubscriptionCmd.java @@ -16,11 +16,13 @@ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd; import lombok.AllArgsConstructor; +import lombok.Data; import lombok.NoArgsConstructor; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType; @NoArgsConstructor @AllArgsConstructor +@Data public abstract class SubscriptionCmd implements TelemetryPluginCmd { private int cmdId; @@ -31,46 +33,6 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd { public abstract SubscriptionType getType(); - public int getCmdId() { - return cmdId; - } - - public void setCmdId(int cmdId) { - this.cmdId = cmdId; - } - - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String getKeys() { - return keys; - } - - public void setTags(String tags) { - this.keys = tags; - } - - public boolean isUnsubscribe() { - return unsubscribe; - } - - public void setUnsubscribe(boolean unsubscribe) { - this.unsubscribe = unsubscribe; - } - - public String getScope() { - return scope; - } - - public void setKeys(String keys) { - this.keys = keys; - } - @Override public String toString() { return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]"; diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java index 4f24a0041e..f4eacf587a 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/cmd/TimeseriesSubscriptionCmd.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.extensions.core.plugin.telemetry.cmd; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.NoArgsConstructor; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType; @@ -22,17 +24,13 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT * @author Andrew Shvayka */ @NoArgsConstructor +@AllArgsConstructor +@Data public class TimeseriesSubscriptionCmd extends SubscriptionCmd { private long timeWindow; - - public long getTimeWindow() { - return timeWindow; - } - - public void setTimeWindow(long timeWindow) { - this.timeWindow = timeWindow; - } + private int limit; + private String agg; @Override public SubscriptionType getType() { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index 5d6e68ee30..739bedfd89 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -158,40 +158,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId()); long endTs = System.currentTimeMillis(); startTs = endTs - cmd.getTimeWindow(); - for (String key : keys) { - TsKvQuery query = new BaseTsKvQuery(key, startTs, endTs); - data.addAll(ctx.loadTimeseries(deviceId, query)); - } - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); - Map subState = new HashMap<>(keys.size()); - keys.forEach(key -> subState.put(key, startTs)); - data.forEach(v -> subState.put(v.getKey(), v.getTs())); - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState); - subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList()); + ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys)); } else { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); startTs = System.currentTimeMillis(); log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), cmd.getDeviceId()); - ctx.loadLatestTimeseries(deviceId, keys, new PluginCallback>() { - @Override - public void onSuccess(PluginContext ctx, List data) { - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); - - Map subState = new HashMap<>(keys.size()); - keys.forEach(key -> subState.put(key, startTs)); - data.forEach(v -> subState.put(v.getKey(), v.getTs())); - SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState); - subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub); - } - - @Override - public void onFailure(PluginContext ctx, Exception e) { - SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, - "Failed to fetch data!"); - sendWsMsg(ctx, sessionRef, update); - } - }); + ctx.loadLatestTimeseries(deviceId, keys, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys)); } } else { ctx.loadLatestTimeseries(deviceId, new PluginCallback>() { @@ -216,6 +190,28 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { } } + private PluginCallback> getSubscriptionCallback(final PluginWebsocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final DeviceId deviceId, final long startTs, final List keys) { + return new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List data) { + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); + + Map subState = new HashMap<>(keys.size()); + keys.forEach(key -> subState.put(key, startTs)); + data.forEach(v -> subState.put(v.getKey(), v.getTs())); + SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState); + subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch data!"); + sendWsMsg(ctx, sessionRef, update); + } + }; + } + private void handleWsHistoryCmd(PluginContext ctx, PluginWebsocketSessionRef sessionRef, GetHistoryCmd cmd) { String sessionId = sessionRef.getSessionId(); WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId); @@ -246,12 +242,19 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { return; } List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); - List data = new ArrayList<>(); - for (String key : keys) { - TsKvQuery query = new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs()); - data.addAll(ctx.loadTimeseries(deviceId, query)); - } - sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList()); + ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List data) { + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR, + "Failed to fetch data!")); + } + }); } private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {