diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b0b822b90e..a347abf138 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -176,9 +176,9 @@ cassandra: # Enable/disable secure connection ssl: "${CASSANDRA_USE_SSL:false}" # Enable/disable JMX - jmx: "${CASSANDRA_USE_JMX:true}" + jmx: "${CASSANDRA_USE_JMX:false}" # Enable/disable metrics collection. - metrics: "${CASSANDRA_USE_METRICS:true}" + metrics: "${CASSANDRA_USE_METRICS:false}" # NONE SNAPPY LZ4 compression: "${CASSANDRA_COMPRESSION:none}" # Specify cassandra cluster initialization timeout in milliseconds (if no hosts available during startup) diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java index 19bf415cfd..b75c506ac9 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java @@ -16,11 +16,16 @@ package org.thingsboard.server.dao.nosql; import com.datastax.oss.driver.api.core.cql.Statement; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.cassandra.guava.GuavaSession; import org.thingsboard.server.dao.util.AsyncTask; +import java.util.function.Function; + /** * Created by ashvayka on 24.10.18. */ @@ -31,4 +36,11 @@ public class CassandraStatementTask implements AsyncTask { private final GuavaSession session; private final Statement statement; + public ListenableFuture executeAsync(Function executeAsyncFunction) { + return Futures.transform(session.executeAsync(statement), + result -> new TbResultSet(statement, result, executeAsyncFunction), + MoreExecutors.directExecutor() + ); + } + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java new file mode 100644 index 0000000000..735e6daab6 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSet.java @@ -0,0 +1,131 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import edu.umd.cs.findbugs.annotations.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.Function; + +public class TbResultSet implements AsyncResultSet { + + private final Statement originalStatement; + private final AsyncResultSet delegate; + private final Function executeAsyncFunction; + + public TbResultSet(Statement originalStatement, AsyncResultSet delegate, + Function executeAsyncFunction) { + this.originalStatement = originalStatement; + this.delegate = delegate; + this.executeAsyncFunction = executeAsyncFunction; + } + + @NonNull + @Override + public ColumnDefinitions getColumnDefinitions() { + return delegate.getColumnDefinitions(); + } + + @NonNull + @Override + public ExecutionInfo getExecutionInfo() { + return delegate.getExecutionInfo(); + } + + @Override + public int remaining() { + return delegate.remaining(); + } + + @NonNull + @Override + public Iterable currentPage() { + return delegate.currentPage(); + } + + @Override + public boolean hasMorePages() { + return delegate.hasMorePages(); + } + + @NonNull + @Override + public CompletionStage fetchNextPage() throws IllegalStateException { + return delegate.fetchNextPage(); + } + + @Override + public boolean wasApplied() { + return delegate.wasApplied(); + } + + public ListenableFuture> allRows(Executor executor) { + List allRows = new ArrayList<>(); + SettableFuture> resultFuture = SettableFuture.create(); + this.processRows(originalStatement, delegate, allRows, resultFuture, executor); + return resultFuture; + } + + private void processRows(Statement statement, + AsyncResultSet resultSet, + List allRows, + SettableFuture> resultFuture, + Executor executor) { + allRows.addAll(loadRows(resultSet)); + if (resultSet.hasMorePages()) { + ByteBuffer nextPagingState = resultSet.getExecutionInfo().getPagingState(); + Statement nextStatement = statement.setPagingState(nextPagingState); + TbResultSetFuture resultSetFuture = executeAsyncFunction.apply(nextStatement); + Futures.addCallback(resultSetFuture, + new FutureCallback() { + @Override + public void onSuccess(@Nullable TbResultSet result) { + processRows(nextStatement, result, + allRows, resultFuture, executor); + } + + @Override + public void onFailure(Throwable t) { + resultFuture.setException(t); + } + }, executor != null ? executor : MoreExecutors.directExecutor() + ); + } else { + resultFuture.set(allRows); + } + } + + List loadRows(AsyncResultSet resultSet) { + return Lists.newArrayList(resultSet.currentPage()); + } + +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java index 8a42b09b78..2799f4d0b9 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java @@ -27,19 +27,19 @@ import java.util.concurrent.TimeoutException; /** * Created by ashvayka on 24.10.18. */ -public class TbResultSetFuture implements ListenableFuture { +public class TbResultSetFuture implements ListenableFuture { - private final SettableFuture mainFuture; + private final SettableFuture mainFuture; - public TbResultSetFuture(SettableFuture mainFuture) { + public TbResultSetFuture(SettableFuture mainFuture) { this.mainFuture = mainFuture; } - public AsyncResultSet getUninterruptibly() { + public TbResultSet getUninterruptibly() { return getSafe(); } - public AsyncResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException { + public TbResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException { return getSafe(timeout, unit); } @@ -59,12 +59,12 @@ public class TbResultSetFuture implements ListenableFuture { } @Override - public AsyncResultSet get() throws InterruptedException, ExecutionException { + public TbResultSet get() throws InterruptedException, ExecutionException { return mainFuture.get(); } @Override - public AsyncResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + public TbResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return mainFuture.get(timeout, unit); } @@ -73,7 +73,7 @@ public class TbResultSetFuture implements ListenableFuture { mainFuture.addListener(listener, executor); } - private AsyncResultSet getSafe() { + private TbResultSet getSafe() { try { return mainFuture.get(); } catch (InterruptedException | ExecutionException e) { @@ -81,7 +81,7 @@ public class TbResultSetFuture implements ListenableFuture { } } - private AsyncResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException { + private TbResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException { try { return mainFuture.get(timeout, unit); } catch (InterruptedException | ExecutionException e) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java index b782ca0a5e..639ddf0877 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractAsyncDao.java @@ -52,21 +52,21 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao { } } - protected ListenableFuture getFuture(TbResultSetFuture future, java.util.function.Function transformer) { - return Futures.transform(future, new Function() { + protected ListenableFuture getFuture(TbResultSetFuture future, java.util.function.Function transformer) { + return Futures.transform(future, new Function() { @Nullable @Override - public T apply(@Nullable AsyncResultSet input) { + public T apply(@Nullable TbResultSet input) { return transformer.apply(input); } }, readResultsProcessingExecutor); } - protected ListenableFuture getFutureAsync(TbResultSetFuture future, com.google.common.util.concurrent.AsyncFunction transformer) { - return Futures.transformAsync(future, new AsyncFunction() { + protected ListenableFuture getFutureAsync(TbResultSetFuture future, com.google.common.util.concurrent.AsyncFunction transformer) { + return Futures.transformAsync(future, new AsyncFunction() { @Nullable @Override - public ListenableFuture apply(@Nullable AsyncResultSet input) { + public ListenableFuture apply(@Nullable TbResultSet input) { try { return transformer.apply(input); } catch (Exception e) { @@ -76,8 +76,4 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao { }, readResultsProcessingExecutor); } - protected ListenableFuture> allRows(AsyncResultSet resultSet) { - return ResultSetUtils.allRows(resultSet, readResultsProcessingExecutor); - } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java index e6db2c092f..5832acac53 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java @@ -39,7 +39,7 @@ import java.util.Map; @Component @Slf4j @NoSqlAnyDao -public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor { +public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor { @Autowired private EntityService entityService; @@ -107,19 +107,22 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor< } @Override - protected SettableFuture create() { + protected SettableFuture create() { return SettableFuture.create(); } @Override - protected TbResultSetFuture wrap(CassandraStatementTask task, SettableFuture future) { + protected TbResultSetFuture wrap(CassandraStatementTask task, SettableFuture future) { return new TbResultSetFuture(future); } @Override - protected ListenableFuture execute(AsyncTaskContext taskCtx) { + protected ListenableFuture execute(AsyncTaskContext taskCtx) { CassandraStatementTask task = taskCtx.getTask(); - return task.getSession().executeAsync(task.getStatement()); + return task.executeAsync( + statement -> + this.submit(new CassandraStatementTask(task.getTenantId(), task.getSession(), statement)) + ); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/ResultSetUtils.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/ResultSetUtils.java deleted file mode 100644 index bf7116132a..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/ResultSetUtils.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.nosql; - -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.Row; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.stream.Collectors; - -public class ResultSetUtils { - - public static ListenableFuture> allRows(AsyncResultSet resultSet, Executor executor) { - List> futures = new ArrayList<>(); - futures.add(Futures.immediateFuture(resultSet)); - while (resultSet.hasMorePages()) { - futures.add(toListenable(resultSet.fetchNextPage())); - } - return Futures.transform( Futures.allAsList(futures), - resultSets -> resultSets.stream() - .map(rs -> loadRows(rs)) - .flatMap(rows -> rows.stream()) - .collect(Collectors.toList()), - executor - ); - } - - private static ListenableFuture toListenable(CompletionStage completable) { - SettableFuture future = SettableFuture.create(); - completable.whenComplete( - (r, ex) -> { - if (ex != null) { - future.setException(ex); - } else { - future.set(r); - } - } - ); - return future; - } - - private static List loadRows(AsyncResultSet resultSet) { - return Lists.newArrayList(resultSet.currentPage()); - } -} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java index dad68c4bb0..41945526fe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/AggregatePartitionsFunction.java @@ -29,7 +29,7 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.nosql.ResultSetUtils; +import org.thingsboard.server.dao.nosql.TbResultSet; import javax.annotation.Nullable; import java.util.List; @@ -41,7 +41,7 @@ import java.util.stream.Collectors; * Created by ashvayka on 20.02.17. */ @Slf4j -public class AggregatePartitionsFunction implements com.google.common.util.concurrent.AsyncFunction, Optional> { +public class AggregatePartitionsFunction implements com.google.common.util.concurrent.AsyncFunction, Optional> { private static final int LONG_CNT_POS = 0; private static final int DOUBLE_CNT_POS = 1; @@ -67,14 +67,14 @@ public class AggregatePartitionsFunction implements com.google.common.util.concu } @Override - public ListenableFuture> apply(@Nullable List rsList) { + public ListenableFuture> apply(@Nullable List rsList) { log.trace("[{}][{}][{}] Going to aggregate data", key, ts, aggregation); if (rsList == null || rsList.isEmpty()) { return Futures.immediateFuture(Optional.empty()); } return Futures.transform( Futures.allAsList( - rsList.stream().map(rs -> ResultSetUtils.allRows(rs, this.executor)) + rsList.stream().map(rs -> rs.allRows(this.executor)) .collect(Collectors.toList())), rowsList -> { try { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 423e5e1593..9105e77956 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao; +import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; import org.thingsboard.server.dao.util.NoSqlTsDao; @@ -238,14 +239,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem BoundStatement stmt = stmtBuilder.build(); - Futures.addCallback(executeAsyncRead(tenantId, stmt), new FutureCallback() { + Futures.addCallback(executeAsyncRead(tenantId, stmt), new FutureCallback() { @Override - public void onSuccess(@Nullable AsyncResultSet result) { + public void onSuccess(@Nullable TbResultSet result) { if (result == null) { cursor.addData(convertResultToTsKvEntryList(Collections.emptyList())); findAllAsyncSequentiallyWithLimit(tenantId, cursor, resultFuture); } else { - Futures.addCallback(allRows(result), new FutureCallback>() { + Futures.addCallback(result.allRows(readResultsProcessingExecutor), new FutureCallback>() { @Override public void onSuccess(@Nullable List result) { @@ -278,21 +279,21 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem final long endTs = query.getEndTs(); final long ts = startTs + (endTs - startTs) / 2; ListenableFuture> partitionsListFuture = getPartitionsFuture(tenantId, query, entityId, minPartition, maxPartition); - ListenableFuture> aggregationChunks = Futures.transformAsync(partitionsListFuture, + ListenableFuture> aggregationChunks = Futures.transformAsync(partitionsListFuture, getFetchChunksAsyncFunction(tenantId, entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor); return Futures.transformAsync(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts, readResultsProcessingExecutor), readResultsProcessingExecutor); } - private AsyncFunction> getPartitionsArrayFunction() { + private AsyncFunction> getPartitionsArrayFunction() { return rs -> - Futures.transform(allRows(rs), rows -> + Futures.transform(rs.allRows(readResultsProcessingExecutor), rows -> rows.stream() .map(row -> row.getLong(ModelConstants.PARTITION_COLUMN)).collect(Collectors.toList()), readResultsProcessingExecutor); } - private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { + private AsyncFunction, List> getFetchChunksAsyncFunction(TenantId tenantId, EntityId entityId, String key, Aggregation aggregation, long startTs, long endTs) { return partitions -> { try { PreparedStatement proto = getFetchStmt(aggregation, DESC_ORDER); @@ -684,8 +685,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return deletePartitionStmt; } - private ListenableFuture> convertAsyncResultSetToTsKvEntryList(AsyncResultSet rs) { - return Futures.transform(this.allRows(rs), + private ListenableFuture> convertAsyncResultSetToTsKvEntryList(TbResultSet rs) { + return Futures.transform(rs.allRows(readResultsProcessingExecutor), rows -> this.convertResultToTsKvEntryList(rows), readResultsProcessingExecutor); } diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 51f34a08d6..eb01f07369 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -8,7 +8,7 @@ cassandra.ssl=false cassandra.jmx=false -cassandra.metrics=true +cassandra.metrics=false cassandra.compression=none @@ -60,4 +60,4 @@ cassandra.query.tenant_rate_limits.enabled=false cassandra.query.tenant_rate_limits.configuration=5000:1,100000:60 cassandra.query.tenant_rate_limits.print_tenant_names=false -service.type=monolith \ No newline at end of file +service.type=monolith