return permit if request expired/canceled

This commit is contained in:
vparomskiy 2018-03-27 13:16:48 +03:00
parent 8b637c9e94
commit 54b272c04f
5 changed files with 82 additions and 15 deletions

View File

@ -0,0 +1,25 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.exception;
public class BufferLimitException extends RuntimeException {
private static final long serialVersionUID = 4513762009041887588L;
public BufferLimitException() {
super("Rate Limit Buffer is full");
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import org.thingsboard.server.dao.exception.BufferLimitException;
import org.thingsboard.server.dao.util.AsyncRateLimiter; import org.thingsboard.server.dao.util.AsyncRateLimiter;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -35,9 +36,15 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
private final ListenableFuture<Void> rateLimitFuture; private final ListenableFuture<Void> rateLimitFuture;
public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) {
this.rateLimitFuture = rateLimiter.acquireAsync(); this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> {
if (!(t instanceof BufferLimitException)) {
rateLimiter.release();
}
return Futures.immediateFailedFuture(t);
});
this.originalFuture = Futures.transform(rateLimitFuture, this.originalFuture = Futures.transform(rateLimitFuture,
(Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement)); (Function<Void, ResultSetFuture>) i -> executeAsyncWithRelease(rateLimiter, session, statement));
} }
@Override @Override
@ -108,10 +115,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
try { try {
ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture); ResultSetFuture resultSetFuture = Uninterruptibles.getUninterruptibly(originalFuture);
resultSetFuture.addListener(listener, executor); resultSetFuture.addListener(listener, executor);
} catch (CancellationException e) { } catch (CancellationException | ExecutionException e) {
cancel(false);
return;
} catch (ExecutionException e) {
Futures.immediateFailedFuture(e).addListener(listener, executor); Futures.immediateFailedFuture(e).addListener(listener, executor);
} }
}, executor); }, executor);

View File

@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.dao.exception.BufferLimitException;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -41,6 +42,9 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
private final AtomicInteger maxQueueSize = new AtomicInteger(); private final AtomicInteger maxQueueSize = new AtomicInteger();
private final AtomicInteger maxGrantedPermissions = new AtomicInteger(); private final AtomicInteger maxGrantedPermissions = new AtomicInteger();
private final AtomicInteger totalGranted = new AtomicInteger();
private final AtomicInteger totalReleased = new AtomicInteger();
private final AtomicInteger totalRequested = new AtomicInteger();
public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit, public BufferedRateLimiter(@Value("${cassandra.query.buffer_size}") int queueLimit,
@Value("${cassandra.query.concurrent_limit}") int permitsLimit, @Value("${cassandra.query.concurrent_limit}") int permitsLimit,
@ -53,11 +57,13 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
@Override @Override
public ListenableFuture<Void> acquireAsync() { public ListenableFuture<Void> acquireAsync() {
totalRequested.incrementAndGet();
if (queue.isEmpty()) { if (queue.isEmpty()) {
if (permits.incrementAndGet() <= permitsLimit) { if (permits.incrementAndGet() <= permitsLimit) {
if (permits.get() > maxGrantedPermissions.get()) { if (permits.get() > maxGrantedPermissions.get()) {
maxGrantedPermissions.set(permits.get()); maxGrantedPermissions.set(permits.get());
} }
totalGranted.incrementAndGet();
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
permits.decrementAndGet(); permits.decrementAndGet();
@ -69,6 +75,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
@Override @Override
public void release() { public void release() {
permits.decrementAndGet(); permits.decrementAndGet();
totalReleased.incrementAndGet();
reprocessQueue(); reprocessQueue();
} }
@ -80,6 +87,7 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
} }
LockedFuture lockedFuture = queue.poll(); LockedFuture lockedFuture = queue.poll();
if (lockedFuture != null) { if (lockedFuture != null) {
totalGranted.incrementAndGet();
lockedFuture.latch.countDown(); lockedFuture.latch.countDown();
} else { } else {
permits.decrementAndGet(); permits.decrementAndGet();
@ -112,17 +120,17 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
LockedFuture lockedFuture = createLockedFuture(); LockedFuture lockedFuture = createLockedFuture();
if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) { if (!queue.offer(lockedFuture, 1, TimeUnit.SECONDS)) {
lockedFuture.cancelFuture(); lockedFuture.cancelFuture();
return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); return Futures.immediateFailedFuture(new BufferLimitException());
} }
if(permits.get() < permitsLimit) { if(permits.get() < permitsLimit) {
reprocessQueue(); reprocessQueue();
} }
return lockedFuture.future; return lockedFuture.future;
} catch (InterruptedException e) { } catch (InterruptedException e) {
return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Task interrupted. Reject")); return Futures.immediateFailedFuture(new BufferLimitException());
} }
} }
return Futures.immediateFailedFuture(new IllegalStateException("Rate Limit Buffer is full. Reject")); return Futures.immediateFailedFuture(new BufferLimitException());
} }
@Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}") @Scheduled(fixedDelayString = "${cassandra.query.rate_limit_print_interval_ms}")
@ -134,8 +142,11 @@ public class BufferedRateLimiter implements AsyncRateLimiter {
expiredCount++; expiredCount++;
} }
} }
log.info("Permits maxBuffer is [{}] max concurrent [{}] expired [{}] current granted [{}]", maxQueueSize.getAndSet(0), log.info("Permits maxBuffer [{}] maxPermits [{}] expired [{}] currPermits [{}] currBuffer [{}] " +
maxGrantedPermissions.getAndSet(0), expiredCount, permits.get()); "totalPermits [{}] totalRequests [{}] totalReleased [{}]",
maxQueueSize.getAndSet(0), maxGrantedPermissions.getAndSet(0), expiredCount,
permits.get(), queue.size(),
totalGranted.getAndSet(0), totalRequested.getAndSet(0), totalReleased.getAndSet(0));
} }
private class LockedFuture { private class LockedFuture {

View File

@ -19,16 +19,17 @@ import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.UnsupportedFeatureException; import com.datastax.driver.core.exceptions.UnsupportedFeatureException;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.thingsboard.server.dao.exception.BufferLimitException;
import org.thingsboard.server.dao.util.AsyncRateLimiter; import org.thingsboard.server.dao.util.AsyncRateLimiter;
import java.util.concurrent.ExecutionException; import java.util.concurrent.*;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
@ -53,7 +54,7 @@ public class RateLimitedResultSetFutureTest {
@Test @Test
public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException { public void doNotReleasePermissionIfRateLimitFutureFailed() throws InterruptedException {
when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new IllegalArgumentException())); when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFailedFuture(new BufferLimitException()));
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
Thread.sleep(1000L); Thread.sleep(1000L);
verify(rateLimiter).acquireAsync(); verify(rateLimiter).acquireAsync();
@ -153,4 +154,29 @@ public class RateLimitedResultSetFutureTest {
verify(rateLimiter, times(1)).release(); verify(rateLimiter, times(1)).release();
} }
@Test
public void expiredQueryReturnPermit() throws InterruptedException, ExecutionException {
CountDownLatch latch = new CountDownLatch(1);
ListenableFuture<Void> future = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)).submit(() -> {
latch.await();
return null;
});
when(rateLimiter.acquireAsync()).thenReturn(future);
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
// TimeUnit.MILLISECONDS.sleep(200);
future.cancel(false);
latch.countDown();
try {
transform.get();
fail();
} catch (Exception e) {
assertTrue(e instanceof ExecutionException);
}
verify(rateLimiter, times(1)).acquireAsync();
verify(rateLimiter, times(1)).release();
}
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.util;
import com.google.common.util.concurrent.*; import com.google.common.util.concurrent.*;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.dao.exception.BufferLimitException;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -61,8 +62,8 @@ public class BufferedRateLimiterTest {
} catch (Exception e) { } catch (Exception e) {
assertTrue(e instanceof ExecutionException); assertTrue(e instanceof ExecutionException);
Throwable actualCause = e.getCause(); Throwable actualCause = e.getCause();
assertTrue(actualCause instanceof IllegalStateException); assertTrue(actualCause instanceof BufferLimitException);
assertEquals("Rate Limit Buffer is full. Reject", actualCause.getMessage()); assertEquals("Rate Limit Buffer is full", actualCause.getMessage());
} }
} }