Aggregation Implementation

This commit is contained in:
Andrew Shvayka 2017-02-21 13:34:57 +02:00
parent 87d05f7c84
commit 9328bbc0b7
10 changed files with 82 additions and 149 deletions

View File

@ -151,20 +151,9 @@ public final class PluginProcessingContext implements PluginContext {
}
@Override
public List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query) {
public void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback) {
validate(deviceId);
try {
return pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query).get();
} catch (Exception e) {
log.error("TODO", e);
throw new RuntimeException(e);
}
}
@Override
public void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback) {
validate(deviceId);
ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, query);
ListenableFuture<List<TsKvEntry>> future = pluginCtx.tsService.findAll(DataConstants.DEVICE, deviceId, queries);
Futures.addCallback(future, getCallback(callback, v -> v), executor);
}

View File

@ -96,7 +96,21 @@ public class BaseTimeseriesDao extends AbstractDao implements TimeseriesDao {
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
public ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, List<TsKvQuery> queries) {
List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityType, entityId, query)).collect(Collectors.toList());
return Futures.transform(Futures.allAsList(futures), new Function<List<List<TsKvEntry>>, List<TsKvEntry>>() {
@Nullable
@Override
public List<TsKvEntry> apply(@Nullable List<List<TsKvEntry>> results) {
List<TsKvEntry> result = new ArrayList<TsKvEntry>();
results.forEach(r -> result.addAll(r));
return result;
}
}, readResultsProcessingExecutor);
}
private ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query) {
if (query.getAggregation() == Aggregation.NONE) {
return findAllAsyncWithLimit(entityType, entityId, query);
} else {

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -32,6 +33,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.service.Validator;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Instant;
@ -40,6 +42,7 @@ import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@ -56,10 +59,10 @@ public class BaseTimeseriesService implements TimeseriesService {
private TimeseriesDao timeseriesDao;
@Override
public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query) {
public ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, List<TsKvQuery> queries) {
validate(entityType, entityId);
validate(query);
return timeseriesDao.findAllAsync(entityType, entityId.getId(), query);
queries.forEach(query -> validate(query));
return timeseriesDao.findAllAsync(entityType, entityId.getId(), queries);
}
@Override
@ -132,7 +135,7 @@ public class BaseTimeseriesService implements TimeseriesService {
throw new IncorrectParameterException("TsKvQuery can't be null");
} else if (isBlank(query.getKey())) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty");
} else if (query.getAggregation() == null){
} else if (query.getAggregation() == null) {
throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty");
}
}

View File

@ -33,9 +33,7 @@ public interface TimeseriesDao {
long toPartitionTs(long ts);
ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, TsKvQuery query);
// List<TsKvEntry> find(String entityType, UUID entityId, TsKvQuery query, Optional<Long> minPartition, Optional<Long> maxPartition);
ListenableFuture<List<TsKvEntry>> findAllAsync(String entityType, UUID entityId, List<TsKvQuery> queries);
ResultSetFuture findLatest(String entityType, UUID entityId, String key);

View File

@ -33,7 +33,7 @@ import java.util.Set;
*/
public interface TimeseriesService {
ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, TsKvQuery query);
ListenableFuture<List<TsKvEntry>> findAll(String entityType, UUIDBased entityId, List<TsKvQuery> queries);
ListenableFuture<List<ResultSet>> findLatest(String entityType, UUIDBased entityId, Collection<String> keys);

View File

@ -82,9 +82,7 @@ public interface PluginContext {
void saveTsData(DeviceId deviceId, List<TsKvEntry> entry, PluginCallback<Void> callback);
List<TsKvEntry> loadTimeseries(DeviceId deviceId, TsKvQuery query);
void loadTimeseries(DeviceId deviceId, TsKvQuery query, PluginCallback<List<TsKvEntry>> callback);
void loadTimeseries(DeviceId deviceId, List<TsKvQuery> queries, PluginCallback<List<TsKvEntry>> callback);
void loadLatestTimeseries(DeviceId deviceId, Collection<String> keys, PluginCallback<List<TsKvEntry>> callback);

View File

@ -15,9 +15,16 @@
*/
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author Andrew Shvayka
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class GetHistoryCmd implements TelemetryPluginCmd {
private int cmdId;
@ -25,46 +32,7 @@ public class GetHistoryCmd implements TelemetryPluginCmd {
private String keys;
private long startTs;
private long endTs;
private int limit;
private String agg;
@Override
public int getCmdId() {
return cmdId;
}
@Override
public void setCmdId(int cmdId) {
this.cmdId = cmdId;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getKeys() {
return keys;
}
public void setKeys(String keys) {
this.keys = keys;
}
public long getStartTs() {
return startTs;
}
public void setStartTs(long startTs) {
this.startTs = startTs;
}
public long getEndTs() {
return endTs;
}
public void setEndTs(long endTs) {
this.endTs = endTs;
}
}

View File

@ -16,11 +16,13 @@
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
@NoArgsConstructor
@AllArgsConstructor
@Data
public abstract class SubscriptionCmd implements TelemetryPluginCmd {
private int cmdId;
@ -31,46 +33,6 @@ public abstract class SubscriptionCmd implements TelemetryPluginCmd {
public abstract SubscriptionType getType();
public int getCmdId() {
return cmdId;
}
public void setCmdId(int cmdId) {
this.cmdId = cmdId;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getKeys() {
return keys;
}
public void setTags(String tags) {
this.keys = tags;
}
public boolean isUnsubscribe() {
return unsubscribe;
}
public void setUnsubscribe(boolean unsubscribe) {
this.unsubscribe = unsubscribe;
}
public String getScope() {
return scope;
}
public void setKeys(String keys) {
this.keys = keys;
}
@Override
public String toString() {
return "SubscriptionCmd [deviceId=" + deviceId + ", tags=" + keys + ", unsubscribe=" + unsubscribe + "]";

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.extensions.core.plugin.telemetry.cmd;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionType;
@ -22,17 +24,13 @@ import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionT
* @author Andrew Shvayka
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TimeseriesSubscriptionCmd extends SubscriptionCmd {
private long timeWindow;
public long getTimeWindow() {
return timeWindow;
}
public void setTimeWindow(long timeWindow) {
this.timeWindow = timeWindow;
}
private int limit;
private String agg;
@Override
public SubscriptionType getType() {

View File

@ -158,40 +158,14 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), cmd.getDeviceId());
long endTs = System.currentTimeMillis();
startTs = endTs - cmd.getTimeWindow();
for (String key : keys) {
TsKvQuery query = new BaseTsKvQuery(key, startTs, endTs);
data.addAll(ctx.loadTimeseries(deviceId, query));
}
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, startTs));
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
} else {
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
startTs = System.currentTimeMillis();
log.debug("[{}] fetching latest timeseries data for keys: ({}) for device : {}", sessionId, cmd.getKeys(), cmd.getDeviceId());
ctx.loadLatestTimeseries(deviceId, keys, new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, startTs));
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
}
@Override
public void onFailure(PluginContext ctx, Exception e) {
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
"Failed to fetch data!");
sendWsMsg(ctx, sessionRef, update);
}
});
ctx.loadLatestTimeseries(deviceId, keys, getSubscriptionCallback(sessionRef, cmd, sessionId, deviceId, startTs, keys));
}
} else {
ctx.loadLatestTimeseries(deviceId, new PluginCallback<List<TsKvEntry>>() {
@ -216,6 +190,28 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
}
}
private PluginCallback<List<TsKvEntry>> getSubscriptionCallback(final PluginWebsocketSessionRef sessionRef, final TimeseriesSubscriptionCmd cmd, final String sessionId, final DeviceId deviceId, final long startTs, final List<String> keys) {
return new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, startTs));
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
SubscriptionState sub = new SubscriptionState(sessionId, cmd.getCmdId(), deviceId, SubscriptionType.TIMESERIES, false, subState);
subscriptionManager.addLocalWsSubscription(ctx, sessionId, deviceId, sub);
}
@Override
public void onFailure(PluginContext ctx, Exception e) {
SubscriptionUpdate update = new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
"Failed to fetch data!");
sendWsMsg(ctx, sessionRef, update);
}
};
}
private void handleWsHistoryCmd(PluginContext ctx, PluginWebsocketSessionRef sessionRef, GetHistoryCmd cmd) {
String sessionId = sessionRef.getSessionId();
WsSessionMetaData sessionMD = wsSessionsMap.get(sessionId);
@ -246,12 +242,19 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
return;
}
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
List<TsKvEntry> data = new ArrayList<>();
for (String key : keys) {
TsKvQuery query = new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs());
data.addAll(ctx.loadTimeseries(deviceId, query));
}
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getLimit(), Aggregation.valueOf(cmd.getAgg()))).collect(Collectors.toList());
ctx.loadTimeseries(deviceId, queries, new PluginCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(PluginContext ctx, List<TsKvEntry> data) {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), data));
}
@Override
public void onFailure(PluginContext ctx, Exception e) {
sendWsMsg(ctx, sessionRef, new SubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR,
"Failed to fetch data!"));
}
});
}
private boolean validateSessionMetadata(PluginContext ctx, PluginWebsocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {