queue request template: sleep on exception shortened according to the stopwatch. test adjusted

This commit is contained in:
Sergey Matvienko 2021-05-07 16:34:35 +03:00
parent e2aa4be741
commit 9daa43a115
2 changed files with 12 additions and 10 deletions

View File

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin;
@ -95,11 +96,13 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
void mainLoop() { void mainLoop() {
while (!stopped) { while (!stopped) {
TbStopWatch sw = TbStopWatch.startNew();
try { try {
fetchAndProcessResponses(); fetchAndProcessResponses();
} catch (Throwable e) { } catch (Throwable e) {
log.warn("Failed to obtain and process responses from queue. Going to sleep " + pollInterval + "ms", e); long sleepNanos = TimeUnit.MILLISECONDS.toNanos(this.pollInterval) - sw.stopAndGetTotalTimeNanos();
sleep(); log.warn("Failed to obtain and process responses from queue. Going to sleep " + sleepNanos + "ns", e);
sleep(sleepNanos);
} }
} }
} }
@ -149,9 +152,8 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
return responseTemplate.poll(pollInterval); return responseTemplate.poll(pollInterval);
} }
void sleep() { void sleep(long nanos) {
Thread.yield(); LockSupport.parkNanos(nanos);
LockSupport.parkNanos(1);
} }
void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest, long currentNs) { void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest, long currentNs) {

View File

@ -54,13 +54,13 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals; import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willDoNothing; import static org.mockito.BDDMockito.willDoNothing;
import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.longThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -141,7 +141,7 @@ public class DefaultTbQueueRequestTemplateTest {
public void givenMainLoop_whenLoopFewTimes_thenVerifyInvocationCount() throws InterruptedException { public void givenMainLoop_whenLoopFewTimes_thenVerifyInvocationCount() throws InterruptedException {
executor = inst.createExecutor(); executor = inst.createExecutor();
CountDownLatch latch = new CountDownLatch(5); CountDownLatch latch = new CountDownLatch(5);
willDoNothing().given(inst).sleep(); willDoNothing().given(inst).sleep(anyLong());
willAnswer(invocation -> { willAnswer(invocation -> {
if (latch.getCount() == 1) { if (latch.getCount() == 1) {
inst.stop(); //stop the loop in natural way inst.stop(); //stop the loop in natural way
@ -158,7 +158,7 @@ public class DefaultTbQueueRequestTemplateTest {
latch.await(10, TimeUnit.SECONDS); latch.await(10, TimeUnit.SECONDS);
verify(inst, times(5)).fetchAndProcessResponses(); verify(inst, times(5)).fetchAndProcessResponses();
verify(inst, times(2)).sleep(); verify(inst, times(2)).sleep(longThat(lessThan(TimeUnit.MILLISECONDS.toNanos(inst.pollInterval))));
} }
@Test @Test