diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index ff2ed25e19..7062ca97f9 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -28,7 +28,13 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.util.StringUtils; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; @@ -39,7 +45,19 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.UUIDBased; -import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.AttributeKey; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.transport.adaptor.JsonConverter; @@ -56,7 +74,13 @@ import org.thingsboard.server.service.telemetry.exception.UncheckedApiException; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -176,7 +200,7 @@ public class TelemetryController extends BaseController { (result, entityId) -> { // If interval is 0, convert this to a NONE aggregation, which is probably what the user really wanted Aggregation agg = interval == 0L ? Aggregation.valueOf(Aggregation.NONE.name()) : Aggregation.valueOf(aggStr); - List queries = toKeysList(keys).stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, interval, limit, agg, "DESC")) + List queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg)) .collect(Collectors.toList()); Futures.addCallback(tsService.findAll(entityId, queries), getTsKvListCallback(result)); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 5c8503650b..548c41739e 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -35,16 +35,16 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseTsKvQuery; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.attributes.AttributesService; @@ -370,9 +370,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor); } else if (subscription.getType() == TelemetryFeature.TIMESERIES) { long curTs = System.currentTimeMillis(); - List queries = new ArrayList<>(); + List queries = new ArrayList<>(); subscription.getKeyStates().entrySet().forEach(e -> { - queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs)); + queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs)); }); DonAsynchron.withCallback(tsService.findAll(entityId, queries), diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 0e8a0d492e..2ff8e89070 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -28,12 +28,22 @@ import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.service.security.AccessValidator; import org.thingsboard.server.service.security.ValidationResult; -import org.thingsboard.server.service.telemetry.cmd.*; +import org.thingsboard.server.service.telemetry.cmd.AttributesSubscriptionCmd; +import org.thingsboard.server.service.telemetry.cmd.GetHistoryCmd; +import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd; +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd; +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd; import org.thingsboard.server.service.telemetry.exception.UnauthorizedException; import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; import org.thingsboard.server.service.telemetry.sub.SubscriptionState; @@ -43,7 +53,14 @@ import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -234,7 +251,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } EntityId entityId = EntityIdFactory.getByTypeAndId(cmd.getEntityType(), cmd.getEntityId()); List keys = new ArrayList<>(getKeys(cmd).orElse(Collections.emptySet())); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC")) + List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, cmd.getStartTs(), cmd.getEndTs(), cmd.getInterval(), getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))) .collect(Collectors.toList()); FutureCallback> callback = new FutureCallback>() { @@ -320,8 +337,8 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi log.debug("[{}] fetching timeseries data for last {} ms for keys: ({}) for device : {}", sessionId, cmd.getTimeWindow(), cmd.getKeys(), entityId); startTs = cmd.getStartTs(); long endTs = cmd.getStartTs() + cmd.getTimeWindow(); - List queries = keys.stream().map(key -> new BaseTsKvQuery(key, startTs, endTs, cmd.getInterval(), - getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()), "DESC")).collect(Collectors.toList()); + List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, cmd.getInterval(), + getLimit(cmd.getLimit()), getAggregation(cmd.getAgg()))).collect(Collectors.toList()); final FutureCallback> callback = getSubscriptionCallback(sessionRef, cmd, sessionId, entityId, startTs, keys); accessValidator.validate(sessionRef.getSecurityCtx(), entityId, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java index f0c8de20af..11b95ca62b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java @@ -18,7 +18,7 @@ package org.thingsboard.server.common.data.kv; import lombok.Data; @Data -public class BaseDeleteTsKvQuery extends BaseQuery implements DeleteTsKvQuery { +public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery { private final Boolean rewriteLatestIfDeleted; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java deleted file mode 100644 index b7e71395a0..0000000000 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseQuery.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.common.data.kv; - -import lombok.Data; - -@Data -public class BaseQuery implements Query { - - private final String key; - private final long startTs; - private final long endTs; - - public BaseQuery(String key, long startTs, long endTs) { - this.key = key; - this.startTs = startTs; - this.endTs = endTs; - } - -} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java new file mode 100644 index 0000000000..3c48adfc18 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseReadTsKvQuery.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.kv; + +import lombok.Data; + +@Data +public class BaseReadTsKvQuery extends BaseTsKvQuery implements ReadTsKvQuery { + + private final long interval; + private final int limit; + private final Aggregation aggregation; + private final String orderBy; + + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation) { + this(key, startTs, endTs, interval, limit, aggregation, "DESC"); + } + + public BaseReadTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, + String orderBy) { + super(key, startTs, endTs); + this.interval = interval; + this.limit = limit; + this.aggregation = aggregation; + this.orderBy = orderBy; + } + + public BaseReadTsKvQuery(String key, long startTs, long endTs) { + this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC"); + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java index 873e1a27cf..c4fa69c18c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseTsKvQuery.java @@ -18,24 +18,16 @@ package org.thingsboard.server.common.data.kv; import lombok.Data; @Data -public class BaseTsKvQuery extends BaseQuery implements TsKvQuery { +public class BaseTsKvQuery implements TsKvQuery { - private final long interval; - private final int limit; - private final Aggregation aggregation; - private final String orderBy; - - public BaseTsKvQuery(String key, long startTs, long endTs, long interval, int limit, Aggregation aggregation, - String orderBy) { - super(key, startTs, endTs); - this.interval = interval; - this.limit = limit; - this.aggregation = aggregation; - this.orderBy = orderBy; - } + private final String key; + private final long startTs; + private final long endTs; public BaseTsKvQuery(String key, long startTs, long endTs) { - this(key, startTs, endTs, endTs - startTs, 1, Aggregation.AVG, "DESC"); + this.key = key; + this.startTs = startTs; + this.endTs = endTs; } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java index a0cd919ec6..7607a59e36 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.common.data.kv; -public interface DeleteTsKvQuery extends Query { +public interface DeleteTsKvQuery extends TsKvQuery { Boolean getRewriteLatestIfDeleted(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java similarity index 81% rename from common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java rename to common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java index 94506047c0..7d54745d11 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/Query.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/ReadTsKvQuery.java @@ -15,12 +15,14 @@ */ package org.thingsboard.server.common.data.kv; -public interface Query { +public interface ReadTsKvQuery extends TsKvQuery { - String getKey(); + long getInterval(); - long getStartTs(); + int getLimit(); - long getEndTs(); + Aggregation getAggregation(); + + String getOrderBy(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java index 9fe1136eb3..9df514e41d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvQuery.java @@ -15,14 +15,12 @@ */ package org.thingsboard.server.common.data.kv; -public interface TsKvQuery extends Query { +public interface TsKvQuery { - long getInterval(); + String getKey(); - int getLimit(); + long getStartTs(); - Aggregation getAggregation(); - - String getOrderBy(); + long getEndTs(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 56609a74ba..1eb2f0026d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -17,7 +17,11 @@ package org.thingsboard.server.dao.sql.timeseries; import com.google.common.base.Function; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -25,7 +29,12 @@ import org.springframework.data.domain.PageRequest; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.UUIDConverter; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.TsKvEntity; import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey; @@ -94,7 +103,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp } @Override - public ListenableFuture> findAllAsync(EntityId entityId, List queries) { + public ListenableFuture> findAllAsync(EntityId entityId, List queries) { List>> futures = queries .stream() .map(query -> findAllAsync(entityId, query)) @@ -113,7 +122,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp }, service); } - private ListenableFuture> findAllAsync(EntityId entityId, TsKvQuery query) { + private ListenableFuture> findAllAsync(EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { return findAllAsyncWithLimit(entityId, query); } else { @@ -220,7 +229,7 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp }); } - private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) { + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { return Futures.immediateFuture( DaoUtil.convertDataList( tsKvRepository.findAllWithLimit( diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 86952002e4..98e859f39f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -23,8 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.kv.TsKvQuery; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.service.Validator; @@ -47,7 +47,7 @@ public class BaseTimeseriesService implements TimeseriesService { private TimeseriesDao timeseriesDao; @Override - public ListenableFuture> findAll(EntityId entityId, List queries) { + public ListenableFuture> findAll(EntityId entityId, List queries) { validate(entityId); queries.forEach(BaseTimeseriesService::validate); return timeseriesDao.findAllAsync(entityId, queries); @@ -118,13 +118,13 @@ public class BaseTimeseriesService implements TimeseriesService { Validator.validateEntityId(entityId, "Incorrect entityId " + entityId); } - private static void validate(TsKvQuery query) { + private static void validate(ReadTsKvQuery query) { if (query == null) { - throw new IncorrectParameterException("TsKvQuery can't be null"); + throw new IncorrectParameterException("ReadTsKvQuery can't be null"); } else if (isBlank(query.getKey())) { - throw new IncorrectParameterException("Incorrect TsKvQuery. Key can't be empty"); + throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Key can't be empty"); } else if (query.getAggregation() == null) { - throw new IncorrectParameterException("Incorrect TsKvQuery. Aggregation can't be empty"); + throw new IncorrectParameterException("Incorrect ReadTsKvQuery. Aggregation can't be empty"); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 7be0e37cf7..03d569d78e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -15,7 +15,12 @@ */ package org.thingsboard.server.dao.timeseries; -import com.datastax.driver.core.*; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.google.common.base.Function; @@ -29,8 +34,18 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao; import org.thingsboard.server.dao.util.NoSqlDao; @@ -41,7 +56,11 @@ import javax.annotation.PreDestroy; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -111,7 +130,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } @Override - public ListenableFuture> findAllAsync(EntityId entityId, List queries) { + public ListenableFuture> findAllAsync(EntityId entityId, List queries) { List>> futures = queries.stream().map(query -> findAllAsync(entityId, query)).collect(Collectors.toList()); return Futures.transform(Futures.allAsList(futures), new Function>, List>() { @Nullable @@ -128,7 +147,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } - private ListenableFuture> findAllAsync(EntityId entityId, TsKvQuery query) { + private ListenableFuture> findAllAsync(EntityId entityId, ReadTsKvQuery query) { if (query.getAggregation() == Aggregation.NONE) { return findAllAsyncWithLimit(entityId, query); } else { @@ -138,7 +157,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem while (stepTs < query.getEndTs()) { long startTs = stepTs; long endTs = stepTs + step; - TsKvQuery subQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy()); + ReadTsKvQuery subQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, step, 1, query.getAggregation(), query.getOrderBy()); futures.add(findAndAggregateAsync(entityId, subQuery, toPartitionTs(startTs), toPartitionTs(endTs))); stepTs = endTs; } @@ -157,7 +176,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return tsFormat.getTruncateUnit().equals(TsPartitionDate.EPOCH_START); } - private ListenableFuture> getPartitionsFuture(TsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { + private ListenableFuture> getPartitionsFuture(ReadTsKvQuery query, EntityId entityId, long minPartition, long maxPartition) { if (isFixedPartitioning()) { //no need to fetch partitions from DB return Futures.immediateFuture(FIXED_PARTITION); } @@ -165,7 +184,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); } - private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, TsKvQuery query) { + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); final ListenableFuture> partitionsListFuture = getPartitionsFuture(query, entityId, minPartition, maxPartition); @@ -221,7 +240,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } } - private ListenableFuture> findAndAggregateAsync(EntityId entityId, TsKvQuery query, long minPartition, long maxPartition) { + private ListenableFuture> findAndAggregateAsync(EntityId entityId, ReadTsKvQuery query, long minPartition, long maxPartition) { final Aggregation aggregation = query.getAggregation(); final String key = query.getKey(); final long startTs = query.getStartTs(); @@ -448,7 +467,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem private ListenableFuture getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) { long startTs = 0; long endTs = query.getStartTs() - 1; - TsKvQuery findNewLatestQuery = new BaseTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, + ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, Aggregation.NONE, DESC_ORDER); ListenableFuture> future = findAllAsync(entityId, findNewLatestQuery); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java index bcd69757a9..f2106086ae 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/QueryCursor.java @@ -16,7 +16,7 @@ package org.thingsboard.server.dao.timeseries; import lombok.Getter; -import org.thingsboard.server.common.data.kv.Query; +import org.thingsboard.server.common.data.kv.TsKvQuery; import java.util.List; import java.util.UUID; @@ -37,7 +37,7 @@ public class QueryCursor { final List partitions; private int partitionIndex; - public QueryCursor(String entityType, UUID entityId, Query baseQuery, List partitions) { + public QueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List partitions) { this.entityType = entityType; this.entityId = entityId; this.key = baseQuery.getKey(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 7f05ff2501..a8cb547c3b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -18,8 +18,8 @@ package org.thingsboard.server.dao.timeseries; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.kv.TsKvQuery; import java.util.List; @@ -28,7 +28,7 @@ import java.util.List; */ public interface TimeseriesDao { - ListenableFuture> findAllAsync(EntityId entityId, List queries); + ListenableFuture> findAllAsync(EntityId entityId, List queries); ListenableFuture findLatest(EntityId entityId, String key); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 345d852979..c5076d2774 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -18,8 +18,8 @@ package org.thingsboard.server.dao.timeseries; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.kv.TsKvQuery; import java.util.Collection; import java.util.List; @@ -29,7 +29,7 @@ import java.util.List; */ public interface TimeseriesService { - ListenableFuture> findAll(EntityId entityId, List queries); + ListenableFuture> findAll(EntityId entityId, List queries); ListenableFuture> findLatest(EntityId entityId, Collection keys); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java index 2d93f5d0dd..b56be56637 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TsKvQueryCursor.java @@ -16,8 +16,8 @@ package org.thingsboard.server.dao.timeseries; import lombok.Getter; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.common.data.kv.TsKvQuery; import java.util.ArrayList; import java.util.List; @@ -38,7 +38,7 @@ public class TsKvQueryCursor extends QueryCursor { private int partitionIndex; private int currentLimit; - public TsKvQueryCursor(String entityType, UUID entityId, TsKvQuery baseQuery, List partitions) { + public TsKvQueryCursor(String entityType, UUID entityId, ReadTsKvQuery baseQuery, List partitions) { super(entityType, entityId, baseQuery, partitions); this.orderBy = baseQuery.getOrderBy(); this.partitionIndex = isDesc() ? partitions.size() - 1 : 0; diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 68b3fa1f31..4528d9afb1 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -20,7 +20,16 @@ import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.Test; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.kv.*; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.service.AbstractServiceTest; import java.util.ArrayList; @@ -106,7 +115,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { new BaseDeleteTsKvQuery(STRING_KEY, 15000, 45000))).get(); List list = tsService.findAll(deviceId, Collections.singletonList( - new BaseTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE, DESC_ORDER))).get(); + new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get(); Assert.assertEquals(1, list.size()); List latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get(); @@ -127,8 +136,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { entries.add(save(deviceId, 45000, 500)); entries.add(save(deviceId, 55000, 600)); - List list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.NONE, DESC_ORDER))).get(); + List list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.NONE))).get(); assertEquals(3, list.size()); assertEquals(55000, list.get(0).getTs()); assertEquals(java.util.Optional.of(600L), list.get(0).getLongValue()); @@ -139,8 +148,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(35000, list.get(2).getTs()); assertEquals(java.util.Optional.of(400L), list.get(2).getLongValue()); - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.AVG, DESC_ORDER))).get(); + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.AVG))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); assertEquals(java.util.Optional.of(150L), list.get(0).getLongValue()); @@ -151,8 +160,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(550L), list.get(2).getLongValue()); - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.SUM, DESC_ORDER))).get(); + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.SUM))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -164,8 +173,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(1100L), list.get(2).getLongValue()); - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.MIN, DESC_ORDER))).get(); + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.MIN))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -177,8 +186,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(500L), list.get(2).getLongValue()); - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.MAX, DESC_ORDER))).get(); + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.MAX))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs()); @@ -190,8 +199,8 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { assertEquals(50000, list.get(2).getTs()); assertEquals(java.util.Optional.of(600L), list.get(2).getLongValue()); - list = tsService.findAll(deviceId, Collections.singletonList(new BaseTsKvQuery(LONG_KEY, 0, - 60000, 20000, 3, Aggregation.COUNT, DESC_ORDER))).get(); + list = tsService.findAll(deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0, + 60000, 20000, 3, Aggregation.COUNT))).get(); assertEquals(3, list.size()); assertEquals(10000, list.get(0).getTs());