diff --git a/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java b/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java new file mode 100644 index 0000000000..3334dc62a9 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/exception/BufferLimitException.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.exception; + +public class BufferLimitException extends RuntimeException { + + private static final long serialVersionUID = 4513762009041887588L; + + public BufferLimitException() { + super("Rate Limit Buffer is full"); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java index 2674c6ddea..d2505632d7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import org.thingsboard.server.dao.exception.BufferLimitException; import org.thingsboard.server.dao.util.AsyncRateLimiter; import javax.annotation.Nullable; @@ -35,9 +36,15 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { private final ListenableFuture rateLimitFuture; public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { - this.rateLimitFuture = rateLimiter.acquireAsync(); + this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> { + if (!(t instanceof BufferLimitException)) { + rateLimiter.release(); + } + return Futures.immediateFailedFuture(t); + }); this.originalFuture = Futures.transform(rateLimitFuture, (Function) i -> executeAsyncWithRelease(rateLimiter, session, statement)); + } @Override @@ -108,10 +115,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { try { ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture); resultSetFuture.addListener(listener, executor); - } catch (CancellationException e) { - cancel(false); - return; - } catch (ExecutionException e) { + } catch (CancellationException | ExecutionException e) { Futures.immediateFailedFuture(e).addListener(listener, executor); } }, executor); diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java index 2acd623a37..03eb46f1ab 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/BufferedRateLimiter.java @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.thingsboard.server.dao.exception.BufferLimitException; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +42,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter { private final AtomicInteger maxQueueSize = new AtomicInteger(); private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); + private final AtomicInteger totalGranted = new AtomicInteger(); + private final AtomicInteger totalReleased = new AtomicInteger(); + private final AtomicInteger totalRequested = new AtomicInteger(); public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, @Value("${cassandra.query.concurrent_limit}") int permitsLimit, @@ -53,11 +57,13 @@ public class BufferedRateLimiter implements AsyncRateLimiter { @Override public ListenableFuture acquireAsync() { + totalRequested.incrementAndGet(); if (queue.isEmpty()) { if (permits.incrementAndGet() <= permitsLimit) { if (permits.get() > maxGrantedPermissions.get()) { maxGrantedPermissions.set(permits.get()); } + totalGranted.incrementAndGet(); return Futures.immediateFuture(null); } permits.decrementAndGet(); @@ -69,6 +75,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter { @Override public void release() { permits.decrementAndGet(); + totalReleased.incrementAndGet(); reprocessQueue(); } @@ -80,6 +87,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter { } LockedFuture lockedFuture = queue.poll(); if (lockedFuture != null) { + totalGranted.incrementAndGet(); lockedFuture.latch.countDown(); } else { permits.decrementAndGet(); @@ -112,17 +120,17 @@ public class BufferedRateLimiter implements AsyncRateLimiter { LockedFuture lockedFuture = createLockedFuture(); if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { lockedFuture.cancelFuture(); - return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + return Futures.immediateFailedFuture(new BufferLimitException()); } if(permits.get() < permitsLimit) { reprocessQueue(); } return lockedFuture.future; } catch (InterruptedException e) { - return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); + return Futures.immediateFailedFuture(new BufferLimitException()); } } - return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); + return Futures.immediateFailedFuture(new BufferLimitException()); } @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @@ -134,8 +142,11 @@ public class BufferedRateLimiter implements AsyncRateLimiter { expiredCount++; } } - log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), - maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); + log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " + + "totalPermits [{}] totalRequests [{}] totalReleased [{}]", + maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount, + permits.get(), queue.size(), + totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0)); } private class LockedFuture { diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java index fa62c2b9b0..f49668d3fd 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java @@ -19,16 +19,17 @@ import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.UnsupportedFeatureException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.Answer; +import org.thingsboard.server.dao.exception.BufferLimitException; import org.thingsboard.server.dao.util.AsyncRateLimiter; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -53,7 +54,7 @@ public class RateLimitedResultSetFutureTest { @Test public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException { - when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException())); + when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new BufferLimitException())); resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); Thread.sleep(1000L); verify(rateLimiter).acquireAsync(); @@ -153,4 +154,29 @@ public class RateLimitedResultSetFutureTest { verify(rateLimiter, times(1)).release(); } + @Test + public void expiredQueryReturnPermit() throws InterruptedException, ExecutionException { + CountDownLatch latch = new CountDownLatch(1); + ListenableFuture future = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)).submit(() -> { + latch.await(); + return null; + }); + when(rateLimiter.acquireAsync()).thenReturn(future); + resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); + + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); +// TimeUnit.MILLISECONDS.sleep(200); + future.cancel(false); + latch.countDown(); + + try { + transform.get(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof ExecutionException); + } + verify(rateLimiter, times(1)).acquireAsync(); + verify(rateLimiter, times(1)).release(); + } + } \ No newline at end of file diff --git a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java index 5bfc3b6e95..67c3ce8d73 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/util/BufferedRateLimiterTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.util; import com.google.common.util.concurrent.*; import org.junit.Test; +import org.thingsboard.server.dao.exception.BufferLimitException; import javax.annotation.Nullable; import java.util.concurrent.ExecutionException; @@ -61,8 +62,8 @@ public class BufferedRateLimiterTest { } catch (Exception e) { assertTrue(e instanceof ExecutionException); Throwable actualCause = e.getCause(); - assertTrue(actualCause instanceof IllegalStateException); - assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage()); + assertTrue(actualCause instanceof BufferLimitException); + assertEquals("Rate Limit Buffer is full", actualCause.getMessage()); } }