find and save new latest if previous deleted
This commit is contained in:
parent
faf14d43a8
commit
e37e7242fd
@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery;
|
|||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.CustomerId;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.data.kv.Aggregation;
|
||||||
import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
|
import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||||
@ -413,8 +414,10 @@ public class DeviceController extends BaseController {
|
|||||||
|
|
||||||
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
|
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
|
||||||
|
|
||||||
|
long startTs = 1519561033000L;
|
||||||
|
long endTs = 1528260633000L;
|
||||||
timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
|
timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
|
||||||
1519139033000L, 1524668633000L))).get();
|
startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get();
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw handleException(e);
|
throw handleException(e);
|
||||||
|
|||||||
@ -27,8 +27,10 @@ public class BaseTsKvQuery implements TsKvQuery {
|
|||||||
private final int limit;
|
private final int limit;
|
||||||
private final Aggregation aggregation;
|
private final Aggregation aggregation;
|
||||||
private final String orderBy;
|
private final String orderBy;
|
||||||
|
private final Boolean rewriteLatestIfDeleted;
|
||||||
|
|
||||||
public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy) {
|
public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, String orderBy,
|
||||||
|
boolean rewriteLatestIfDeleted) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.startTs = startTs;
|
this.startTs = startTs;
|
||||||
this.endTs = endTs;
|
this.endTs = endTs;
|
||||||
@ -36,10 +38,11 @@ public class BaseTsKvQuery implements TsKvQuery {
|
|||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
this.aggregation = aggregation;
|
this.aggregation = aggregation;
|
||||||
this.orderBy = orderBy;
|
this.orderBy = orderBy;
|
||||||
|
this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
public BaseTsKvQuery(String key, long startTs, long endTs) {
|
public BaseTsKvQuery(String key, long startTs, long endTs) {
|
||||||
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC");
|
this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,4 +30,6 @@ public interface TsKvQuery {
|
|||||||
Aggregation getAggregation();
|
Aggregation getAggregation();
|
||||||
|
|
||||||
String getOrderBy();
|
String getOrderBy();
|
||||||
|
|
||||||
|
Boolean getRewriteLatestIfDeleted();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -134,7 +134,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
while (stepTs < query.getEndTs()) {
|
while (stepTs < query.getEndTs()) {
|
||||||
long startTs = stepTs;
|
long startTs = stepTs;
|
||||||
long endTs = stepTs + step;
|
long endTs = stepTs + step;
|
||||||
TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy());
|
TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy(), false);
|
||||||
futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
|
futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs)));
|
||||||
stepTs = endTs;
|
stepTs = endTs;
|
||||||
}
|
}
|
||||||
@ -400,15 +400,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
return Futures.immediateFuture(false);
|
return Futures.immediateFuture(false);
|
||||||
}, readResultsProcessingExecutor);
|
}, readResultsProcessingExecutor);
|
||||||
|
|
||||||
|
|
||||||
ListenableFuture<Void> savedLatestFuture = Futures.transform(booleanFuture,
|
|
||||||
(AsyncFunction<Boolean, Void>) isRemove -> {
|
|
||||||
if (isRemove) {
|
|
||||||
return getNewLatestEntryFuture(entityId, query);
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}, readResultsProcessingExecutor);
|
|
||||||
|
|
||||||
ListenableFuture<Void> removedLatestFuture = Futures.transform(booleanFuture,
|
ListenableFuture<Void> removedLatestFuture = Futures.transform(booleanFuture,
|
||||||
(AsyncFunction<Boolean, Void>) isRemove -> {
|
(AsyncFunction<Boolean, Void>) isRemove -> {
|
||||||
if (isRemove) {
|
if (isRemove) {
|
||||||
@ -416,15 +407,27 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
|||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}, readResultsProcessingExecutor);
|
}, readResultsProcessingExecutor);
|
||||||
return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
|
|
||||||
(AsyncFunction<List<Void>, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
|
if (query.getRewriteLatestIfDeleted()) {
|
||||||
|
ListenableFuture<Void> savedLatestFuture = Futures.transform(booleanFuture,
|
||||||
|
(AsyncFunction<Boolean, Void>) isRemove -> {
|
||||||
|
if (isRemove) {
|
||||||
|
return getNewLatestEntryFuture(entityId, query);
|
||||||
|
}
|
||||||
|
return Futures.immediateFuture(null);
|
||||||
|
}, readResultsProcessingExecutor);
|
||||||
|
|
||||||
|
return Futures.transform(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)),
|
||||||
|
(AsyncFunction<List<Void>, Void>) list -> Futures.immediateFuture(null), readResultsProcessingExecutor);
|
||||||
|
}
|
||||||
|
return removedLatestFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
|
private ListenableFuture<Void> getNewLatestEntryFuture(EntityId entityId, TsKvQuery query) {
|
||||||
long startTs = 0;
|
long startTs = 0;
|
||||||
long endTs = query.getStartTs() - 1;
|
long endTs = query.getStartTs() - 1;
|
||||||
TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
|
TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
|
||||||
Aggregation.NONE, DESC_ORDER);
|
Aggregation.NONE, DESC_ORDER, false);
|
||||||
ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
|
ListenableFuture<List<TsKvEntry>> future = findAllAsync(entityId, findNewLatestQuery);
|
||||||
|
|
||||||
return Futures.transform(future, (AsyncFunction<List<TsKvEntry>, Void>) entryList -> {
|
return Futures.transform(future, (AsyncFunction<List<TsKvEntry>, Void>) entryList -> {
|
||||||
|
|||||||
@ -127,7 +127,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
entries.add(save(deviceId, 55000, 600));
|
entries.add(save(deviceId, 55000, 600));
|
||||||
|
|
||||||
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
||||||
60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get();
|
60000, 20000, 3, Aggregation.NONE, DESC_ORDER, false))).get();
|
||||||
assertEquals(3, list.size());
|
assertEquals(3, list.size());
|
||||||
assertEquals(55000, list.get(0).getTs());
|
assertEquals(55000, list.get(0).getTs());
|
||||||
assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
|
assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue());
|
||||||
@ -139,7 +139,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
|
assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue());
|
||||||
|
|
||||||
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
||||||
60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get();
|
60000, 20000, 3, Aggregation.AVG, DESC_ORDER, false))).get();
|
||||||
assertEquals(3, list.size());
|
assertEquals(3, list.size());
|
||||||
assertEquals(10000, list.get(0).getTs());
|
assertEquals(10000, list.get(0).getTs());
|
||||||
assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
|
assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue());
|
||||||
@ -151,7 +151,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
|
assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue());
|
||||||
|
|
||||||
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
||||||
60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get();
|
60000, 20000, 3, Aggregation.SUM, DESC_ORDER, false))).get();
|
||||||
|
|
||||||
assertEquals(3, list.size());
|
assertEquals(3, list.size());
|
||||||
assertEquals(10000, list.get(0).getTs());
|
assertEquals(10000, list.get(0).getTs());
|
||||||
@ -164,7 +164,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
|
assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue());
|
||||||
|
|
||||||
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
||||||
60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get();
|
60000, 20000, 3, Aggregation.MIN, DESC_ORDER, false))).get();
|
||||||
|
|
||||||
assertEquals(3, list.size());
|
assertEquals(3, list.size());
|
||||||
assertEquals(10000, list.get(0).getTs());
|
assertEquals(10000, list.get(0).getTs());
|
||||||
@ -177,7 +177,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
|
assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue());
|
||||||
|
|
||||||
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
||||||
60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get();
|
60000, 20000, 3, Aggregation.MAX, DESC_ORDER, false))).get();
|
||||||
|
|
||||||
assertEquals(3, list.size());
|
assertEquals(3, list.size());
|
||||||
assertEquals(10000, list.get(0).getTs());
|
assertEquals(10000, list.get(0).getTs());
|
||||||
@ -190,7 +190,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
|
|||||||
assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
|
assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue());
|
||||||
|
|
||||||
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0,
|
||||||
60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get();
|
60000, 20000, 3, Aggregation.COUNT, DESC_ORDER, false))).get();
|
||||||
|
|
||||||
assertEquals(3, list.size());
|
assertEquals(3, list.size());
|
||||||
assertEquals(10000, list.get(0).getTs());
|
assertEquals(10000, list.get(0).getTs());
|
||||||
|
|||||||
@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.EntityType;
|
|||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
import org.thingsboard.server.common.data.id.UUIDBased;
|
|
||||||
import org.thingsboard.server.common.data.kv.*;
|
import org.thingsboard.server.common.data.kv.*;
|
||||||
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
|
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
|
||||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||||
@ -138,9 +137,10 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
|
|||||||
|
|
||||||
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
|
// If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted
|
||||||
Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) :
|
Aggregation agg = (interval.isPresent() && interval.get() == 0) ? Aggregation.valueOf(Aggregation.NONE.name()) :
|
||||||
Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
|
Aggregation.valueOf(request.getParameter("agg", Aggregation.NONE.name()));
|
||||||
|
|
||||||
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(), interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC"))
|
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs.get(), endTs.get(),
|
||||||
|
interval.get(), limit.orElse(TelemetryWebsocketMsgHandler.DEFAULT_LIMIT), agg, "DESC", false))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg));
|
ctx.loadTimeseries(entityId, queries, getTsKvListCallback(msg));
|
||||||
} else {
|
} else {
|
||||||
@ -218,7 +218,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean handleHttpPostAttributes(PluginContext ctx, PluginRestMsg msg, RestRequest request,
|
private boolean handleHttpPostAttributes(PluginContext ctx, PluginRestMsg msg, RestRequest request,
|
||||||
EntityId entityId, String scope) throws ServletException, IOException {
|
EntityId entityId, String scope) throws ServletException, IOException {
|
||||||
if (DataConstants.SERVER_SCOPE.equals(scope) ||
|
if (DataConstants.SERVER_SCOPE.equals(scope) ||
|
||||||
DataConstants.SHARED_SCOPE.equals(scope)) {
|
DataConstants.SHARED_SCOPE.equals(scope)) {
|
||||||
JsonNode jsonNode;
|
JsonNode jsonNode;
|
||||||
@ -274,7 +274,7 @@ public class TelemetryRestMsgHandler extends DefaultRestMsgHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return attributes;
|
return attributes;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleHttpPostTimeseries(PluginContext ctx, PluginRestMsg msg, RestRequest request, EntityId entityId, long ttl) {
|
private void handleHttpPostTimeseries(PluginContext ctx, PluginRestMsg msg, RestRequest request, EntityId entityId, long ttl) {
|
||||||
|
|||||||
@ -217,7 +217,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
|
log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId);
|
||||||
startTs = cmd.getStartTs();
|
startTs = cmd.getStartTs();
|
||||||
long endTs = cmd.getStartTs() + cmd.getTimeWindow();
|
long endTs = cmd.getStartTs() + cmd.getTimeWindow();
|
||||||
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY)).collect(Collectors.toList());
|
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(),
|
||||||
|
getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY, false)).collect(Collectors.toList());
|
||||||
ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys));
|
ctx.loadTimeseries(entityId, queries, getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys));
|
||||||
} else {
|
} else {
|
||||||
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
||||||
@ -301,7 +302,8 @@ public class TelemetryWebsocketMsgHandler extends DefaultWebsocketMsgHandler {
|
|||||||
}
|
}
|
||||||
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
|
EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId());
|
||||||
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
List<String> keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet()));
|
||||||
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY))
|
List<TsKvQuery> queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(),
|
||||||
|
cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), ORDER_BY, false))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
ctx.loadTimeseries(entityId, queries, new PluginCallback<List<TsKvEntry>>() {
|
ctx.loadTimeseries(entityId, queries, new PluginCallback<List<TsKvEntry>>() {
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user