diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index fb504843d8..c7cb47c2a1 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -87,30 +87,26 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } else if (method.equals("values")) { if ("timeseries".equals(entity)) { String keysStr = request.getParameter("keys"); + List keys = Arrays.asList(keysStr.split(",")); + Optional startTs = request.getLongParamValue("startTs"); Optional endTs = request.getLongParamValue("endTs"); Optional interval = request.getLongParamValue("interval"); Optional limit = request.getIntParamValue("limit"); - Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); - List keys = Arrays.asList(keysStr.split(",")); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)).collect(Collectors.toList()); - ctx.loadTimeseries(deviceId, queries, new PluginCallback>() { - @Override - public void onSuccess(PluginContext ctx, List data) { - Map> result = new LinkedHashMap<>(); - for (TsKvEntry entry : data) { - result.put(entry.getKey(), data.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList())); - } - msg.getResponseHolder().setResult(new ResponseEntity<>(data, HttpStatus.OK)); + if (startTs.isPresent() || endTs.isPresent() || interval.isPresent() || limit.isPresent()) { + if (!startTs.isPresent() || !endTs.isPresent() || !interval.isPresent()) { + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); + return; } + Aggregation agg = Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); - @Override - public void onFailure(PluginContext ctx, Exception e) { - log.error("Failed to fetch historical data", e); - msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); - } - }); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg)) + .collect(Collectors.toList()); + ctx.loadTimeseries(deviceId, queries, getTsKvListCallback(msg)); + } else { + ctx.loadLatestTimeseries(deviceId, keys, getTsKvListCallback(msg)); + } } else if ("attributes".equals(entity)) { String keys = request.getParameter("keys", ""); @@ -137,6 +133,25 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } } + private PluginCallback> getTsKvListCallback(final PluginRestMsg msg) { + return new PluginCallback>() { + @Override + public void onSuccess(PluginContext ctx, List data) { + Map> result = new LinkedHashMap<>(); + for (TsKvEntry entry : data) { + result.put(entry.getKey(), data.stream().map(v -> new TsData(v.getTs(), v.getValueAsString())).collect(Collectors.toList())); + } + msg.getResponseHolder().setResult(new ResponseEntity<>(result, HttpStatus.OK)); + } + + @Override + public void onFailure(PluginContext ctx, Exception e) { + log.error("Failed to fetch historical data", e); + msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }; + } + @Override public void handleHttpPostRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException { RestRequest request = msg.getRequest();