test: added failed test for FetchAndProcessResponses when request removed as staled too early DefaultTbQueueRequestTemplate
This commit is contained in:
		
							parent
							
								
									d5fffa5002
								
							
						
					
					
						commit
						9666986156
					
				@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.SettableFuture;
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 | 
			
		||||
@ -47,15 +48,16 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
    private final TbQueueAdmin queueAdmin;
 | 
			
		||||
    private final TbQueueProducer<Request> requestTemplate;
 | 
			
		||||
    private final TbQueueConsumer<Response> responseTemplate;
 | 
			
		||||
    private final ConcurrentMap<UUID, DefaultTbQueueRequestTemplate.ResponseMetaData<Response>> pendingRequests;
 | 
			
		||||
    final ConcurrentMap<UUID, DefaultTbQueueRequestTemplate.ResponseMetaData<Response>> pendingRequests;
 | 
			
		||||
    final boolean internalExecutor;
 | 
			
		||||
    private final ExecutorService executor;
 | 
			
		||||
    private final long maxRequestTimeout;
 | 
			
		||||
    private final long maxPendingRequests;
 | 
			
		||||
    private final long pollInterval;
 | 
			
		||||
    final ExecutorService executor;
 | 
			
		||||
    final long maxRequestTimeout;
 | 
			
		||||
    final long maxPendingRequests;
 | 
			
		||||
    final long pollInterval;
 | 
			
		||||
    volatile long tickTs = 0L;
 | 
			
		||||
    volatile long tickSize = 0L;
 | 
			
		||||
    volatile boolean stopped = false;
 | 
			
		||||
    long nextCleanupMs = 0L;
 | 
			
		||||
 | 
			
		||||
    private MessagesStats messagesStats;
 | 
			
		||||
 | 
			
		||||
@ -75,44 +77,26 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
        this.maxPendingRequests = maxPendingRequests;
 | 
			
		||||
        this.pollInterval = pollInterval;
 | 
			
		||||
        this.internalExecutor = (executor == null);
 | 
			
		||||
        this.executor = internalExecutor
 | 
			
		||||
                ? Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-request-template-" + responseTemplate.getTopic()))
 | 
			
		||||
                : executor;
 | 
			
		||||
        this.executor = internalExecutor ? createExecutor() : executor;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ExecutorService createExecutor() {
 | 
			
		||||
        return Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-request-template-" + responseTemplate.getTopic()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init() {
 | 
			
		||||
        queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
 | 
			
		||||
        requestTemplate.init();
 | 
			
		||||
        tickTs = System.currentTimeMillis();
 | 
			
		||||
        tickTs = getCurrentTime();
 | 
			
		||||
        responseTemplate.subscribe();
 | 
			
		||||
        executor.submit(this::fetchAndProcessResponses);
 | 
			
		||||
        executor.submit(this::mainLoop);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void fetchAndProcessResponses() {
 | 
			
		||||
        long nextCleanupMs = 0L;
 | 
			
		||||
    void mainLoop() {
 | 
			
		||||
        while (!stopped) {
 | 
			
		||||
            try {
 | 
			
		||||
                final int pendingRequestsCount = pendingRequests.size();
 | 
			
		||||
                log.trace("Starting template pool topic {}, for pendingRequests {}", responseTemplate.getTopic(), pendingRequestsCount);
 | 
			
		||||
                List<Response> responses = doPoll(); //poll js responses
 | 
			
		||||
                log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}]", responseTemplate.getTopic(), pendingRequestsCount, responses.size());
 | 
			
		||||
                responses.forEach(this::processResponse);
 | 
			
		||||
                responseTemplate.commit();
 | 
			
		||||
                tickTs = System.currentTimeMillis();
 | 
			
		||||
                tickSize = pendingRequests.size();
 | 
			
		||||
                if (nextCleanupMs < tickTs) {
 | 
			
		||||
                    //cleanup;
 | 
			
		||||
                    pendingRequests.forEach((key, value) -> {
 | 
			
		||||
                        if (value.expTime < tickTs) {
 | 
			
		||||
                            ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
 | 
			
		||||
                            if (staleRequest != null) {
 | 
			
		||||
                                setTimeoutException(key, staleRequest);
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                    });
 | 
			
		||||
                    nextCleanupMs = tickTs + maxRequestTimeout;
 | 
			
		||||
                }
 | 
			
		||||
                fetchAndProcessResponses();
 | 
			
		||||
            } catch (Throwable e) {
 | 
			
		||||
                log.warn("Failed to obtain responses from queue. Going to sleep " + pollInterval + "ms", e);
 | 
			
		||||
                sleep();
 | 
			
		||||
@ -120,6 +104,36 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void fetchAndProcessResponses() {
 | 
			
		||||
        final int pendingRequestsCount = pendingRequests.size();
 | 
			
		||||
        log.info("Starting template pool topic {}, for pendingRequests {}", responseTemplate.getTopic(), pendingRequestsCount);
 | 
			
		||||
        List<Response> responses = doPoll(); //poll js responses
 | 
			
		||||
        //if (responses.size() > 0) {
 | 
			
		||||
        log.trace("Completed template poll topic {}, for pendingRequests [{}], received [{}]", responseTemplate.getTopic(), pendingRequestsCount, responses.size());
 | 
			
		||||
        //}
 | 
			
		||||
        responses.forEach(this::processResponse); //this can take a long time
 | 
			
		||||
        responseTemplate.commit();
 | 
			
		||||
        tickTs = getCurrentTime();
 | 
			
		||||
        tickSize = pendingRequests.size();
 | 
			
		||||
        if (nextCleanupMs < tickTs) {
 | 
			
		||||
            //cleanup;
 | 
			
		||||
            pendingRequests.forEach((key, value) -> {
 | 
			
		||||
                if (value.expTime < tickTs) {
 | 
			
		||||
                    ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
 | 
			
		||||
                    if (staleRequest != null) {
 | 
			
		||||
                        setTimeoutException(key, staleRequest, tickTs);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            setupNextCleanup();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void setupNextCleanup() {
 | 
			
		||||
        nextCleanupMs = tickTs + maxRequestTimeout;
 | 
			
		||||
        log.info("setupNextCleanup {}", nextCleanupMs);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    List<Response> doPoll() {
 | 
			
		||||
        return responseTemplate.poll(pollInterval);
 | 
			
		||||
    }
 | 
			
		||||
@ -132,8 +146,13 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest) {
 | 
			
		||||
        log.info("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
 | 
			
		||||
    void setTimeoutException(UUID key, ResponseMetaData<Response> staleRequest, long tickTs) {
 | 
			
		||||
        if (tickTs >= staleRequest.getSubmitTime() + staleRequest.getTimeout()) {
 | 
			
		||||
            log.info("Request timeout detected, tickTs [{}], {}, key [{}]", tickTs, staleRequest, key);
 | 
			
		||||
        } else {
 | 
			
		||||
            log.error("Request timeout detected, tickTs [{}], {}, key [{}]", tickTs, staleRequest, key);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        staleRequest.future.setException(new TimeoutException());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -144,10 +163,10 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
            log.error("[{}] Missing requestId in header and body", response);
 | 
			
		||||
        } else {
 | 
			
		||||
            requestId = bytesToUuid(requestIdHeader);
 | 
			
		||||
            log.trace("[{}] Response received: {}", requestId, response);
 | 
			
		||||
            log.trace("[{}] Response received: {}", requestId, String.valueOf(response).replace("\n", " ")); //TODO remove overhead
 | 
			
		||||
            ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
 | 
			
		||||
            if (expectedResponse == null) {
 | 
			
		||||
                log.warn("[{}] Invalid or stale request, response: {}", requestId, response);
 | 
			
		||||
                log.warn("[{}] Invalid or stale request, response: {}", requestId, String.valueOf(response).replace("\n", " "));
 | 
			
		||||
            } else {
 | 
			
		||||
                expectedResponse.future.set(response);
 | 
			
		||||
            }
 | 
			
		||||
@ -184,11 +203,22 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
        UUID requestId = UUID.randomUUID();
 | 
			
		||||
        request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
 | 
			
		||||
        request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
 | 
			
		||||
        request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
 | 
			
		||||
        long currentTime = getCurrentTime();
 | 
			
		||||
        request.getHeaders().put(REQUEST_TIME, longToBytes(currentTime));
 | 
			
		||||
        SettableFuture<Response> future = SettableFuture.create();
 | 
			
		||||
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
 | 
			
		||||
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future, currentTime, maxRequestTimeout);
 | 
			
		||||
        log.info("pending {}", responseMetaData);
 | 
			
		||||
        pendingRequests.putIfAbsent(requestId, responseMetaData);
 | 
			
		||||
        log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);
 | 
			
		||||
        sendToRequestTemplate(request, requestId, future, responseMetaData);
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    long getCurrentTime() {
 | 
			
		||||
        return System.currentTimeMillis();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {
 | 
			
		||||
        log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, String.valueOf(request).replace("\n", " "));
 | 
			
		||||
        if (messagesStats != null) {
 | 
			
		||||
            messagesStats.incrementTotal();
 | 
			
		||||
        }
 | 
			
		||||
@ -198,7 +228,7 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
                if (messagesStats != null) {
 | 
			
		||||
                    messagesStats.incrementSuccessful();
 | 
			
		||||
                }
 | 
			
		||||
                log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);
 | 
			
		||||
                log.trace("[{}] Request sent: {}, request {}", requestId, metadata, String.valueOf(request).replace("\n", " "));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
@ -210,17 +240,32 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response
 | 
			
		||||
                future.setException(t);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
        return future;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static class ResponseMetaData<T> {
 | 
			
		||||
    @Getter
 | 
			
		||||
    static class ResponseMetaData<T> {
 | 
			
		||||
        private final long submitTime;
 | 
			
		||||
        private final long timeout;
 | 
			
		||||
        private final long expTime;
 | 
			
		||||
        private final SettableFuture<T> future;
 | 
			
		||||
 | 
			
		||||
        ResponseMetaData(long ts, SettableFuture<T> future) {
 | 
			
		||||
        ResponseMetaData(long ts, SettableFuture<T> future, long submitTime, long timeout) {
 | 
			
		||||
            this.submitTime = submitTime;
 | 
			
		||||
            this.timeout = timeout;
 | 
			
		||||
            this.expTime = ts;
 | 
			
		||||
            this.future = future;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        @Override
 | 
			
		||||
        public String toString() {
 | 
			
		||||
            return "ResponseMetaData{" +
 | 
			
		||||
                    "submitTime=" + submitTime +
 | 
			
		||||
                    ", calculatedExpTime=" + (submitTime + timeout) +
 | 
			
		||||
                    ", expTime=" + expTime +
 | 
			
		||||
                    ", deltaMs=" + (expTime - submitTime) +
 | 
			
		||||
                    ", future=" + future +
 | 
			
		||||
                    '}';
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -29,23 +29,29 @@
 | 
			
		||||
 * OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT  MAY DESCRIBE, IN WHOLE OR IN PART.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.queue.common;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.junit.After;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.junit.runner.RunWith;
 | 
			
		||||
import org.mockito.ArgumentCaptor;
 | 
			
		||||
import org.mockito.Mock;
 | 
			
		||||
import org.mockito.runners.MockitoJUnitRunner;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueAdmin;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueConsumer;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsg;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueMsgHeaders;
 | 
			
		||||
import org.thingsboard.server.queue.TbQueueProducer;
 | 
			
		||||
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.CountDownLatch;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicLong;
 | 
			
		||||
 | 
			
		||||
import static org.hamcrest.Matchers.equalTo;
 | 
			
		||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 | 
			
		||||
import static org.junit.Assert.assertEquals;
 | 
			
		||||
import static org.junit.Assert.assertFalse;
 | 
			
		||||
import static org.junit.Assert.assertNotEquals;
 | 
			
		||||
@ -54,12 +60,17 @@ import static org.mockito.BDDMockito.willAnswer;
 | 
			
		||||
import static org.mockito.BDDMockito.willDoNothing;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
import static org.mockito.Matchers.any;
 | 
			
		||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 | 
			
		||||
import static org.mockito.Mockito.atLeastOnce;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
import static org.mockito.Mockito.spy;
 | 
			
		||||
import static org.mockito.Mockito.times;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
 | 
			
		||||
import static org.hamcrest.MatcherAssert.assertThat;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RunWith(MockitoJUnitRunner.class)
 | 
			
		||||
public class DefaultTbQueueRequestTemplateTest {
 | 
			
		||||
 | 
			
		||||
@ -74,9 +85,9 @@ public class DefaultTbQueueRequestTemplateTest {
 | 
			
		||||
 | 
			
		||||
    ExecutorService executor;
 | 
			
		||||
    String topic = "js-responses-tb-node-0";
 | 
			
		||||
    long maxRequestTimeout = 20;
 | 
			
		||||
    long maxRequestTimeout = 10;
 | 
			
		||||
    long maxPendingRequests = 32;
 | 
			
		||||
    long pollInterval = 25;
 | 
			
		||||
    long pollInterval = 5;
 | 
			
		||||
 | 
			
		||||
    DefaultTbQueueRequestTemplate inst;
 | 
			
		||||
 | 
			
		||||
@ -171,20 +182,49 @@ public class DefaultTbQueueRequestTemplateTest {
 | 
			
		||||
            assertFalse(inst.send(getRequestMsgMock()).isDone()); //SettableFuture future - pending only
 | 
			
		||||
        }
 | 
			
		||||
        for (int i = 0; i < msgOverflowCount; i++) {
 | 
			
		||||
            assertTrue("max pending requests overflow", inst.send(getRequestMsgMock()).isDone()); //overflow, immediate failed future
 | 
			
		||||
            assertFalse("max pending requests overflow", inst.send(getRequestMsgMock()).isDone()); //overflow, immediate failed future
 | 
			
		||||
        }
 | 
			
		||||
        assertEquals(inst.maxPendingRequests, inst.pendingRequests.size());
 | 
			
		||||
        assertThat(inst.pendingRequests.size(), equalTo(inst.maxPendingRequests));
 | 
			
		||||
        verify(inst, times((int) inst.maxPendingRequests)).sendToRequestTemplate(any(), any(), any(), any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenNothing_whenFetchAndProcessResponsesWithTimeout_thenFail() {
 | 
			
		||||
    public void givenNothing_whenSendAndFetchAndProcessResponsesWithTimeout_thenFail() {
 | 
			
		||||
        //given
 | 
			
		||||
        AtomicLong currentTime = new AtomicLong();
 | 
			
		||||
        willAnswer(x -> {
 | 
			
		||||
            log.info("currentTime={}", currentTime.get());
 | 
			
		||||
            return currentTime.get();
 | 
			
		||||
        }).given(inst).getCurrentTime();
 | 
			
		||||
        inst.init();
 | 
			
		||||
        inst.setupNextCleanup();
 | 
			
		||||
        willReturn(Collections.emptyList()).given(inst).doPoll();
 | 
			
		||||
        willDoNothing().given(inst).processResponse(any());
 | 
			
		||||
 | 
			
		||||
        //when
 | 
			
		||||
        for (int i = 0; i <= inst.maxRequestTimeout*2; i++) {
 | 
			
		||||
            currentTime.incrementAndGet();
 | 
			
		||||
            assertFalse(inst.send(getRequestMsgMock()).isDone()); //SettableFuture future - pending only
 | 
			
		||||
            if (i % (inst.maxRequestTimeout * 3 / 2) == 0) {
 | 
			
		||||
                inst.fetchAndProcessResponses();
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        //then
 | 
			
		||||
        ArgumentCaptor<DefaultTbQueueRequestTemplate.ResponseMetaData> argumentCaptorResp = ArgumentCaptor.forClass(DefaultTbQueueRequestTemplate.ResponseMetaData.class);
 | 
			
		||||
        ArgumentCaptor<UUID> argumentCaptorUUID = ArgumentCaptor.forClass(UUID.class);
 | 
			
		||||
        ArgumentCaptor<Long> argumentCaptorLong = ArgumentCaptor.forClass(Long.class);
 | 
			
		||||
        verify(inst, atLeastOnce()).setTimeoutException(argumentCaptorUUID.capture(), argumentCaptorResp.capture(), argumentCaptorLong.capture());
 | 
			
		||||
 | 
			
		||||
        List<DefaultTbQueueRequestTemplate.ResponseMetaData> responseMetaDataList = argumentCaptorResp.getAllValues();
 | 
			
		||||
        List<Long> tickTsList = argumentCaptorLong.getAllValues();
 | 
			
		||||
        for (int i = 0; i < responseMetaDataList.size(); i++) {
 | 
			
		||||
            assertThat("tickTs >= calculatedExpTime", tickTsList.get(i), greaterThanOrEqualTo(responseMetaDataList.get(i).getSubmitTime() + responseMetaDataList.get(i).getTimeout()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    TbQueueMsg getRequestMsgMock() {
 | 
			
		||||
        TbQueueMsg requestMsg = mock(TbQueueMsg.class);
 | 
			
		||||
        willReturn(mock(TbQueueMsgHeaders.class)).given(requestMsg).getHeaders();
 | 
			
		||||
        return requestMsg;
 | 
			
		||||
        return mock(TbQueueMsg.class, RETURNS_DEEP_STUBS);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user