diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index da6db707f1..ec5dde5cdd 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -140,7 +140,7 @@ cassandra: buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" - rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:30000}" + rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" # SQL configuration parameters sql: diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java index d1af1677db..b38110b460 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraAbstractDao.java @@ -35,7 +35,6 @@ import org.thingsboard.server.dao.model.type.ComponentTypeCodec; import org.thingsboard.server.dao.model.type.DeviceCredentialsTypeCodec; import org.thingsboard.server.dao.model.type.EntityTypeCodec; import org.thingsboard.server.dao.model.type.JsonCodec; -import org.thingsboard.server.dao.util.BufferedRateLimiter; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -49,7 +48,7 @@ public abstract class CassandraAbstractDao { private ConcurrentMap preparedStatementMap = new ConcurrentHashMap<>(); @Autowired - private BufferedRateLimiter rateLimiter; + private CassandraBufferedRateExecutor rateLimiter; private Session session; @@ -115,12 +114,12 @@ public abstract class CassandraAbstractDao { if (statement.getConsistencyLevel() == null) { statement.setConsistencyLevel(level); } - return new RateLimitedResultSetFuture(getSession(), rateLimiter, statement); + return rateLimiter.submit(new CassandraStatementTask(getSession(), statement)); } private static String statementToString(Statement statement) { if (statement instanceof BoundStatement) { - return ((BoundStatement)statement).preparedStatement().getQueryString(); + return ((BoundStatement) statement).preparedStatement().getQueryString(); } else { return statement.toString(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java new file mode 100644 index 0000000000..478c76d5c0 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.google.common.util.concurrent.SettableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.thingsboard.server.dao.nosql.tmp.AbstractBufferedRateExecutor; +import org.thingsboard.server.dao.nosql.tmp.AsyncTaskContext; +import org.thingsboard.server.dao.util.NoSqlAnyDao; + +import javax.annotation.PreDestroy; + +/** + * Created by ashvayka on 24.10.18. + */ +@Component +@Slf4j +@NoSqlAnyDao +public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor { + + public CassandraBufferedRateExecutor( + @Value("${cassandra.query.buffer_size}") int queueLimit, + @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, + @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime, + @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads, + @Value("${cassandra.query.callback_threads:2}") int callbackThreads, + @Value("${cassandra.query.poll_ms:50}") long pollMs) { + super(queueLimit, concurrencyLimit, maxWaitTime, dispatcherThreads, callbackThreads, pollMs); + } + + @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") + public void printStats() { + log.info("Permits totalAdded [{}] totalLaunched [{}] totalReleased [{}] totalFailed [{}] totalExpired [{}] totalRejected [{}] currBuffer [{}] ", + totalAdded.getAndSet(0), totalLaunched.getAndSet(0), totalReleased.getAndSet(0), + totalFailed.getAndSet(0), totalExpired.getAndSet(0), totalRejected.getAndSet(0), + concurrencyLevel.get()); + } + + @PreDestroy + public void stop() { + super.stop(); + } + + @Override + protected SettableFuture create() { + return SettableFuture.create(); + } + + @Override + protected ResultSetFuture wrap(CassandraStatementTask task, SettableFuture future) { + return new TbResultSetFuture(future); + } + + @Override + protected ResultSetFuture execute(AsyncTaskContext taskCtx) { + CassandraStatementTask task = taskCtx.getTask(); + return task.getSession().executeAsync(task.getStatement()); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java new file mode 100644 index 0000000000..ea13679b7c --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraStatementTask.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import lombok.Data; +import org.thingsboard.server.dao.nosql.tmp.AsyncTask; + +/** + * Created by ashvayka on 24.10.18. + */ +@Data +public class CassandraStatementTask implements AsyncTask { + + private final Session session; + private final Statement statement; + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java new file mode 100644 index 0000000000..574a5f5e69 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/TbResultSetFuture.java @@ -0,0 +1,94 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.google.common.util.concurrent.SettableFuture; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Created by ashvayka on 24.10.18. + */ +public class TbResultSetFuture implements ResultSetFuture { + + private final SettableFuture mainFuture; + + public TbResultSetFuture(SettableFuture mainFuture) { + this.mainFuture = mainFuture; + } + + @Override + public ResultSet getUninterruptibly() { + return getSafe(); + } + + @Override + public ResultSet getUninterruptibly(long timeout, TimeUnit unit) throws TimeoutException { + return getSafe(timeout, unit); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return mainFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return mainFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return mainFuture.isDone(); + } + + @Override + public ResultSet get() throws InterruptedException, ExecutionException { + return mainFuture.get(); + } + + @Override + public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return mainFuture.get(timeout, unit); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + mainFuture.addListener(listener, executor); + } + + private ResultSet getSafe() { + try { + return mainFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + private ResultSet getSafe(long timeout, TimeUnit unit) throws TimeoutException { + try { + return mainFuture.get(timeout, unit); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java new file mode 100644 index 0000000000..9ad8e18745 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AbstractBufferedRateExecutor.java @@ -0,0 +1,169 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql.tmp; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created by ashvayka on 24.10.18. + */ +@Slf4j +public abstract class AbstractBufferedRateExecutor, V> implements BufferedRateExecutor { + + private final long maxWaitTime; + private final long pollMs; + private final BlockingQueue> queue; + private final ExecutorService dispatcherExecutor; + private final ExecutorService callbackExecutor; + private final ScheduledExecutorService timeoutExecutor; + private final int concurrencyLimit; + + protected final AtomicInteger concurrencyLevel = new AtomicInteger(); + protected final AtomicInteger totalAdded = new AtomicInteger(); + protected final AtomicInteger totalLaunched = new AtomicInteger(); + protected final AtomicInteger totalReleased = new AtomicInteger(); + protected final AtomicInteger totalFailed = new AtomicInteger(); + protected final AtomicInteger totalExpired = new AtomicInteger(); + protected final AtomicInteger totalRejected = new AtomicInteger(); + + public AbstractBufferedRateExecutor(int queueLimit, int concurrencyLimit, long maxWaitTime, int dispatcherThreads, int callbackThreads, long pollMs) { + this.maxWaitTime = maxWaitTime; + this.pollMs = pollMs; + this.concurrencyLimit = concurrencyLimit; + this.queue = new LinkedBlockingDeque<>(queueLimit); + this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); + this.callbackExecutor = Executors.newFixedThreadPool(callbackThreads); + this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); + for (int i = 0; i < dispatcherThreads; i++) { + dispatcherExecutor.submit(this::dispatch); + } + } + + @Override + public F submit(T task) { + SettableFuture settableFuture = create(); + F result = wrap(task, settableFuture); + try { + totalAdded.incrementAndGet(); + queue.add(new AsyncTaskContext<>(UUID.randomUUID(), task, settableFuture, System.currentTimeMillis())); + } catch (IllegalStateException e) { + totalRejected.incrementAndGet(); + settableFuture.setException(e); + } + return result; + } + + public void stop() { + if (dispatcherExecutor != null) { + dispatcherExecutor.shutdownNow(); + } + if (callbackExecutor != null) { + callbackExecutor.shutdownNow(); + } + if (timeoutExecutor != null) { + timeoutExecutor.shutdownNow(); + } + } + + protected abstract SettableFuture create(); + + protected abstract F wrap(T task, SettableFuture future); + + protected abstract ListenableFuture execute(AsyncTaskContext taskCtx); + + private void dispatch() { + log.info("Buffered rate executor thread started"); + while (!Thread.interrupted()) { + int curLvl = concurrencyLevel.get(); + AsyncTaskContext taskCtx = null; + try { + if (curLvl <= concurrencyLimit) { + taskCtx = queue.take(); + final AsyncTaskContext finalTaskCtx = taskCtx; + logTask("Processing", finalTaskCtx); + concurrencyLevel.incrementAndGet(); + long timeout = finalTaskCtx.getCreateTime() + maxWaitTime - System.currentTimeMillis(); + if (timeout > 0) { + totalLaunched.incrementAndGet(); + ListenableFuture result = execute(finalTaskCtx); + result = Futures.withTimeout(result, timeout, TimeUnit.MILLISECONDS, timeoutExecutor); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(@Nullable V result) { + logTask("Releasing", finalTaskCtx); + totalReleased.incrementAndGet(); + concurrencyLevel.decrementAndGet(); + finalTaskCtx.getFuture().set(result); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof TimeoutException) { + logTask("Expired During Execution", finalTaskCtx); + } else { + logTask("Failed", finalTaskCtx); + } + totalFailed.incrementAndGet(); + concurrencyLevel.decrementAndGet(); + finalTaskCtx.getFuture().setException(t); + log.debug("[{}] Failed to execute task: {}", finalTaskCtx.getId(), finalTaskCtx.getTask(), t); + } + }, callbackExecutor); + } else { + logTask("Expired Before Execution", finalTaskCtx); + totalExpired.incrementAndGet(); + concurrencyLevel.decrementAndGet(); + taskCtx.getFuture().setException(new TimeoutException()); + } + } else { + Thread.sleep(pollMs); + } + } catch (Throwable e) { + if (taskCtx != null) { + log.debug("[{}] Failed to execute task: {}", taskCtx.getId(), taskCtx, e); + totalFailed.incrementAndGet(); + concurrencyLevel.decrementAndGet(); + } else { + log.debug("Failed to queue task:", e); + } + } + } + log.info("Buffered rate executor thread stopped"); + } + + private void logTask(String action, AsyncTaskContext taskCtx) { + if (log.isTraceEnabled()) { + log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); + } else { + log.debug("[{}] {} task", taskCtx.getId(), action); + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java new file mode 100644 index 0000000000..a16568b6bd --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTask.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql.tmp; + +/** + * Created by ashvayka on 24.10.18. + */ +public interface AsyncTask { +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java new file mode 100644 index 0000000000..c3c98ee361 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/AsyncTaskContext.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql.tmp; + +import com.google.common.util.concurrent.SettableFuture; +import lombok.Data; + +import java.util.UUID; + +/** + * Created by ashvayka on 24.10.18. + */ +@Data +public class AsyncTaskContext { + + private final UUID id; + private final T task; + private final SettableFuture future; + private final long createTime; + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java new file mode 100644 index 0000000000..7adc914885 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/tmp/BufferedRateExecutor.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.nosql.tmp; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Created by ashvayka on 24.10.18. + */ +public interface BufferedRateExecutor { + + F submit(T task); + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java deleted file mode 100644 index eab05b2fdb..0000000000 --- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.util; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.thingsboard.server.dao.exception.BufferLimitException; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -@Component -@Slf4j -@NoSqlAnyDao -public class BufferedRateLimiter implements AsyncRateLimiter { - - private final ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); - - private final int permitsLimit; - private final int maxPermitWaitTime; - private final AtomicInteger permits; - private final BlockingQueue queue; - - private final AtomicInteger maxQueueSize = new AtomicInteger(); - private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); - private final AtomicInteger totalGranted = new AtomicInteger(); - private final AtomicInteger totalReleased = new AtomicInteger(); - private final AtomicInteger totalRequested = new AtomicInteger(); - - public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, - @Value("${cassandra.query.concurrent_limit}") int permitsLimit, - @Value("${cassandra.query.permit_max_wait_time}") int maxPermitWaitTime) { - this.permitsLimit = permitsLimit; - this.maxPermitWaitTime = maxPermitWaitTime; - this.permits = new AtomicInteger(); - this.queue = new LinkedBlockingQueue<>(queueLimit); - } - - @Override - public ListenableFuture acquireAsync() { - totalRequested.incrementAndGet(); - if (queue.isEmpty()) { - if (permits.incrementAndGet() <= permitsLimit) { - if (permits.get() > maxGrantedPermissions.get()) { - maxGrantedPermissions.set(permits.get()); - } - totalGranted.incrementAndGet(); - return Futures.immediateFuture(null); - } - permits.decrementAndGet(); - } - - return putInQueue(); - } - - @Override - public void release() { - permits.decrementAndGet(); - totalReleased.incrementAndGet(); - reprocessQueue(); - } - - private void reprocessQueue() { - while (permits.get() < permitsLimit) { - if (permits.incrementAndGet() <= permitsLimit) { - if (permits.get() > maxGrantedPermissions.get()) { - maxGrantedPermissions.set(permits.get()); - } - LockedFuture lockedFuture = queue.poll(); - if (lockedFuture != null) { - totalGranted.incrementAndGet(); - lockedFuture.latch.countDown(); - } else { - permits.decrementAndGet(); - break; - } - } else { - permits.decrementAndGet(); - } - } - } - - private LockedFuture createLockedFuture() { - CountDownLatch latch = new CountDownLatch(1); - ListenableFuture future = pool.submit(() -> { - latch.await(); - return null; - }); - return new LockedFuture(latch, future, System.currentTimeMillis()); - } - - private ListenableFuture putInQueue() { - - int size = queue.size(); - if (size > maxQueueSize.get()) { - maxQueueSize.set(size); - } - - if (queue.remainingCapacity() > 0) { - try { - LockedFuture lockedFuture = createLockedFuture(); - if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { - lockedFuture.cancelFuture(); - return Futures.immediateFailedFuture(new BufferLimitException()); - } - if(permits.get() < permitsLimit) { - reprocessQueue(); - } - if(permits.get() < permitsLimit) { - reprocessQueue(); - } - return lockedFuture.future; - } catch (InterruptedException e) { - return Futures.immediateFailedFuture(new BufferLimitException()); - } - } - return Futures.immediateFailedFuture(new BufferLimitException()); - } - - @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") - public void printStats() { - int expiredCount = 0; - for (LockedFuture lockedFuture : queue) { - if (lockedFuture.isExpired()) { - lockedFuture.cancelFuture(); - expiredCount++; - } - } - log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " + - "totalPermits [{}] totalRequests [{}] totalReleased [{}]", - maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount, - permits.get(), queue.size(), - totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0)); - } - - private class LockedFuture { - final CountDownLatch latch; - final ListenableFuture future; - final long createTime; - - public LockedFuture(CountDownLatch latch, ListenableFuture future, long createTime) { - this.latch = latch; - this.future = future; - this.createTime = createTime; - } - - void cancelFuture() { - future.cancel(false); - latch.countDown(); - } - - boolean isExpired() { - return (System.currentTimeMillis() - createTime) > maxPermitWaitTime; - } - - } - - -} diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java deleted file mode 100644 index 366cefd5d8..0000000000 --- a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.dao.util; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.junit.Test; -import org.thingsboard.server.dao.exception.BufferLimitException; - -import javax.annotation.Nullable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - - -public class BufferedRateLimiterTest { - - @Test - public void finishedFutureReturnedIfPermitsAreGranted() { - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 10, 100); - ListenableFuture actual = limiter.acquireAsync(); - assertTrue(actual.isDone()); - } - - @Test - public void notFinishedFutureReturnedIfPermitsAreNotGranted() { - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 1, 100); - ListenableFuture actual1 = limiter.acquireAsync(); - ListenableFuture actual2 = limiter.acquireAsync(); - assertTrue(actual1.isDone()); - assertFalse(actual2.isDone()); - } - - @Test - public void failedFutureReturnedIfQueueIsfull() { - BufferedRateLimiter limiter = new BufferedRateLimiter(1, 1, 100); - ListenableFuture actual1 = limiter.acquireAsync(); - ListenableFuture actual2 = limiter.acquireAsync(); - ListenableFuture actual3 = limiter.acquireAsync(); - - assertTrue(actual1.isDone()); - assertFalse(actual2.isDone()); - assertTrue(actual3.isDone()); - try { - actual3.get(); - fail(); - } catch (Exception e) { - assertTrue(e instanceof ExecutionException); - Throwable actualCause = e.getCause(); - assertTrue(actualCause instanceof BufferLimitException); - assertEquals("Rate Limit Buffer is full", actualCause.getMessage()); - } - } - - @Test - public void releasedPermitTriggerTasksFromQueue() throws InterruptedException { - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); - ListenableFuture actual1 = limiter.acquireAsync(); - ListenableFuture actual2 = limiter.acquireAsync(); - ListenableFuture actual3 = limiter.acquireAsync(); - ListenableFuture actual4 = limiter.acquireAsync(); - assertTrue(actual1.isDone()); - assertTrue(actual2.isDone()); - assertFalse(actual3.isDone()); - assertFalse(actual4.isDone()); - limiter.release(); - TimeUnit.MILLISECONDS.sleep(100L); - assertTrue(actual3.isDone()); - assertFalse(actual4.isDone()); - limiter.release(); - TimeUnit.MILLISECONDS.sleep(100L); - assertTrue(actual4.isDone()); - } - - @Test - public void permitsReleasedInConcurrentMode() throws InterruptedException { - BufferedRateLimiter limiter = new BufferedRateLimiter(10, 2, 100); - AtomicInteger actualReleased = new AtomicInteger(); - AtomicInteger actualRejected = new AtomicInteger(); - ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); - for (int i = 0; i < 100; i++) { - ListenableFuture> submit = pool.submit(limiter::acquireAsync); - Futures.addCallback(submit, new FutureCallback>() { - @Override - public void onSuccess(@Nullable ListenableFuture result) { - Futures.addCallback(result, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void result) { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - limiter.release(); - actualReleased.incrementAndGet(); - } - - @Override - public void onFailure(Throwable t) { - actualRejected.incrementAndGet(); - } - }); - } - - @Override - public void onFailure(Throwable t) { - } - }); - } - - TimeUnit.SECONDS.sleep(2); - assertTrue("Unexpected released count " + actualReleased.get(), - actualReleased.get() > 10 && actualReleased.get() < 20); - assertTrue("Unexpected rejected count " + actualRejected.get(), - actualRejected.get() > 80 && actualRejected.get() < 90); - - } - - -} \ No newline at end of file