From e37e7242fdb01e4cd385c347f14e6706bd6a40b8 Mon Sep 17 00:00:00 2001 From: Dima Landiak Date: Mon, 4 Jun 2018 19:38:08 +0300 Subject: [PATCH] find and save new latest if previous deleted --- .../server/controller/DeviceController.java | 5 +++- .../server/common/data/kv/BaseTsKvQuery.java | 7 +++-- .../server/common/data/kv/TsKvQuery.java | 2 ++ .../CassandraBaseTimeseriesDao.java | 29 ++++++++++--------- .../timeseries/BaseTimeseriesServiceTest.java | 12 ++++---- .../handlers/TelemetryRestMsgHandler.java | 10 +++---- .../TelemetryWebsocketMsgHandler.java | 6 ++-- 7 files changed, 42 insertions(+), 29 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index ed9bf77c46..856ca8e399 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -413,8 +414,10 @@ public class DeviceController extends BaseController { Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test"); + long startTs = 1519561033000L; + long endTs = 1528260633000L; timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test", - 1519139033000L, 1524668633000L))).get(); + startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get(); } catch (Exception e) { throw handleException(e); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java index 51d4ad2004..55d279768e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java @@ -27,8 +27,10 @@ public class BaseTsKvQuery implements TsKvQuery { private final int limit; private final Aggregation aggregation; private final String orderBy; + private final Boolean rewriteLatestIfDeleted; - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy) { + public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy, + boolean rewriteLatestIfDeleted) { this.key = key; this.startTs = startTs; this.endTs = endTs; @@ -36,10 +38,11 @@ public class BaseTsKvQuery implements TsKvQuery { this.limit = limit; this.aggregation = aggregation; this.orderBy = orderBy; + this.rewriteLatestIfDeleted = rewriteLatestIfDeleted; } public BaseTsKvQuery(String key, long startTs, long endTs) { - this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC"); + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java index 9b907c3440..825df6c176 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java @@ -30,4 +30,6 @@ public interface TsKvQuery { Aggregation getAggregation(); String getOrderBy(); + + Boolean getRewriteLatestIfDeleted(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 423e90efbf..3e4e2bdac5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -134,7 +134,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem while (stepTs < query.getEndTs()) { long startTs = stepTs; long endTs = stepTs + step; - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy()); + TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false); futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); stepTs = endTs; } @@ -400,15 +400,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return Futures.immediateFuture(false); }, readResultsProcessingExecutor); - - ListenableFuture savedLatestFuture = Futures.transform(booleanFuture, - (AsyncFunction) isRemove -> { - if (isRemove) { - return getNewLatestEntryFuture(entityId, query); - } - return Futures.immediateFuture(null); - }, readResultsProcessingExecutor); - ListenableFuture removedLatestFuture = Futures.transform(booleanFuture, (AsyncFunction) isRemove -> { if (isRemove) { @@ -416,15 +407,27 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } return Futures.immediateFuture(null); }, readResultsProcessingExecutor); - return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)), - (AsyncFunction, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor); + + if (query.getRewriteLatestIfDeleted()) { + ListenableFuture savedLatestFuture = Futures.transform(booleanFuture, + (AsyncFunction) isRemove -> { + if (isRemove) { + return getNewLatestEntryFuture(entityId, query); + } + return Futures.immediateFuture(null); + }, readResultsProcessingExecutor); + + return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)), + (AsyncFunction, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor); + } + return removedLatestFuture; } private ListenableFuture getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) { long startTs = 0; long endTs = query.getStartTs() - 1; TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, - Aggregation.NONE, DESC_ORDER); + Aggregation.NONE, DESC_ORDER, false); ListenableFuture> future = findAllAsync(entityId, findNewLatestQuery); return Futures.transform(future, (AsyncFunction, Void>) entryList -> { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 181a19743b..a8d022ba2f 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -127,7 +127,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { entries.add(save(deviceId, 55000, 600)); List list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get(); + 60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get(); assertEquals(3, list.size()); assertEquals(55000, list.get(0).getTs()); assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue()); @@ -139,7 +139,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get(); + 60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); @@ -151,7 +151,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get(); + 60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -164,7 +164,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get(); + 60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -177,7 +177,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get(); + 60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -190,7 +190,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue()); list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get(); + 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java index 0c7e3874ee..bb0813ff77 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryRestMsgHandler.java @@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.kv.*; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.transport.adaptor.JsonConverter; @@ -138,9 +137,10 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) : - Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); + Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name())); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC")) + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), + interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC", false)) .collect(Collectors.toList()); ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg)); } else { @@ -218,7 +218,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } private boolean handleHttpPostAttributes(PluginContext ctx, PluginRestMsg msg, RestRequest request, - EntityId entityId, String scope) throws ServletException, IOException { + EntityId entityId, String scope) throws ServletException, IOException { if (DataConstants.SERVER_SCOPE.equals(scope) || DataConstants.SHARED_SCOPE.equals(scope)) { JsonNode jsonNode; @@ -274,7 +274,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler { } } }); - return attributes; + return attributes; } private void handleHttpPostTimeseries(PluginContext ctx, PluginRestMsg msg, RestRequest request, EntityId entityId, long ttl) { diff --git a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java index bf75c5de29..c024b673e3 100644 --- a/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java +++ b/extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/telemetry/handlers/TelemetryWebsocketMsgHandler.java @@ -217,7 +217,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); startTs = cmd.getStartTs(); long endTs = cmd.getStartTs() + cmd.getTimeWindow(); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)).collect(Collectors.toList()); + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY, false)).collect(Collectors.toList()); ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys)); } else { List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); @@ -301,7 +302,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler { } EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)) + List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), + cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY, false)) .collect(Collectors.toList()); ctx.loadTimeseries(entityId, queries, new PluginCallback>() { @Override