Queue Implementation

This commit is contained in:
Andrii Shvaika 2020-03-10 12:34:05 +02:00
parent b7c652ab07
commit a39e8c3756
13 changed files with 473 additions and 5 deletions

View File

@ -0,0 +1,9 @@
package org.thingsboard.server;
import com.google.common.util.concurrent.ListenableFuture;
public interface TbQueueAdmin {
ListenableFuture<Void> createTopicIfNotExists(String topic);
}

View File

@ -2,7 +2,7 @@ package org.thingsboard.server;
public interface TbQueueCallback {
void onSuccess();
void onSuccess(TbQueueMsgMetadata metadata);
void onFailure(Throwable t);
}

View File

@ -4,7 +4,11 @@ import java.util.List;
public interface TbQueueConsumer<T extends TbQueueMsg> {
List<TbQueueMsg> poll();
String getTopic();
void subscribe();
List<T> poll(long durationInMillis);
void commit();

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Created by ashvayka on 05.10.18.
*/
public interface TbQueueHandler<Request extends TbQueueMsg, Response extends TbQueueMsg> {
ListenableFuture<Response> handle(Request request);
}

View File

@ -1,13 +1,12 @@
package org.thingsboard.server;
import java.util.Map;
import java.util.UUID;
public interface TbQueueMsg {
UUID getKey();
Map<String, byte[]> getHeaders();
TbQueueMsgHeaders getHeaders();
byte[] getData();
}

View File

@ -0,0 +1,8 @@
package org.thingsboard.server;
public interface TbQueueMsgHeaders {
byte[] put(String key, byte[] value);
byte[] get(String key);
}

View File

@ -4,6 +4,12 @@ import com.google.common.util.concurrent.ListenableFuture;
public interface TbQueueProducer<T extends TbQueueMsg> {
void init();
String getDefaultTopic();
ListenableFuture<TbQueueMsgMetadata> send(T msg, TbQueueCallback callback);
ListenableFuture<TbQueueMsgMetadata> send(String topic, T msg, TbQueueCallback callback);
}

View File

@ -4,6 +4,6 @@ import com.google.common.util.concurrent.ListenableFuture;
public interface TbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> {
ListenableFuture<Response> post(String key, Request request);
ListenableFuture<Response> send(String key, Request request);
}

View File

@ -0,0 +1,8 @@
package org.thingsboard.server;
public interface TbQueueResponseTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> {
void init(TbQueueHandler<Request, Response> handler);
void stop();
}

View File

@ -0,0 +1,32 @@
package org.thingsboard.server.common;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class AbstractTbQueueTemplate {
protected static final String REQUEST_ID_HEADER = "requestId";
protected static final String RESPONSE_TOPIC_HEADER = "responseTopic";
protected byte[] uuidToBytes(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(16);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
protected static UUID bytesToUuid(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
long firstLong = bb.getLong();
long secondLong = bb.getLong();
return new UUID(firstLong, secondLong);
}
protected byte[] stringToBytes(String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
protected String bytesToString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Created by ashvayka on 05.10.18.
*/
public class AsyncCallbackTemplate {
public static <T> void withCallbackAndTimeout(ListenableFuture<T> future,
Consumer<T> onSuccess,
Consumer<Throwable> onFailure,
long timeoutInMs,
ScheduledExecutorService timeoutExecutor,
Executor callbackExecutor) {
future = Futures.withTimeout(future, timeoutInMs, TimeUnit.MILLISECONDS, timeoutExecutor);
withCallback(future, onSuccess, onFailure, callbackExecutor);
}
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure, Executor executor) {
FutureCallback<T> callback = new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
try {
onSuccess.accept(result);
} catch (Throwable th) {
onFailure(th);
}
}
@Override
public void onFailure(Throwable t) {
onFailure.accept(t);
}
};
if (executor != null) {
Futures.addCallback(future, callback, executor);
} else {
Futures.addCallback(future, callback);
}
}
}

View File

@ -0,0 +1,172 @@
package org.thingsboard.server.common;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.InterruptException;
import org.thingsboard.server.TbQueueAdmin;
import org.thingsboard.server.TbQueueCallback;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueMsg;
import org.thingsboard.server.TbQueueMsgMetadata;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
@Slf4j
public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate
implements TbQueueRequestTemplate<Request, Response> {
private final TbQueueAdmin queueAdmin;
private final TbQueueProducer<Request> requestTemplate;
private final TbQueueConsumer<Response> responseTemplate;
private final ConcurrentMap<UUID, DefaultTbQueueRequestTemplate.ResponseMetaData<Response>> pendingRequests;
private final boolean internalExecutor;
private final ExecutorService executor;
private final long maxRequestTimeout;
private final long maxPendingRequests;
private final long pollInterval;
private volatile long tickTs = 0L;
private volatile long tickSize = 0L;
private volatile boolean stopped = false;
@Builder
public DefaultTbQueueRequestTemplate(TbQueueAdmin queueAdmin,
TbQueueProducer<Request> requestTemplate,
TbQueueConsumer<Response> responseTemplate,
long maxRequestTimeout,
long maxPendingRequests,
long pollInterval,
ExecutorService executor) {
this.queueAdmin = queueAdmin;
this.requestTemplate = requestTemplate;
this.responseTemplate = responseTemplate;
this.pendingRequests = new ConcurrentHashMap<>();
this.maxRequestTimeout = maxRequestTimeout;
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
if (executor != null) {
internalExecutor = false;
this.executor = executor;
} else {
internalExecutor = true;
this.executor = Executors.newSingleThreadExecutor();
}
}
public void init() {
queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
this.requestTemplate.init();
tickTs = System.currentTimeMillis();
responseTemplate.subscribe();
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
try {
List<Response> responses = responseTemplate.poll(pollInterval);
if (responses.size() > 0) {
log.trace("Polling responses completed, consumer records count [{}]", responses.size());
}
responses.forEach(response -> {
log.trace("Received response to Kafka Template request: {}", response);
byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
UUID requestId = null;
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
requestId = bytesToUuid(requestIdHeader);
log.trace("[{}] Response received", requestId);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
expectedResponse.future.set(response);
}
}
});
responseTemplate.commit();
tickTs = System.currentTimeMillis();
tickSize = pendingRequests.size();
if (nextCleanupMs < tickTs) {
//cleanup;
pendingRequests.forEach((key, value) -> {
if (value.expTime < tickTs) {
ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
if (staleRequest != null) {
log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
staleRequest.future.setException(new TimeoutException());
}
}
});
nextCleanupMs = tickTs + maxRequestTimeout;
}
} catch (InterruptException ie) {
if (!stopped) {
log.warn("Fetching data from kafka was interrupted.", ie);
}
} catch (Throwable e) {
log.warn("Failed to obtain responses from queue.", e);
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new responses", e2);
}
}
}
});
}
public void stop() {
stopped = true;
if (internalExecutor) {
executor.shutdownNow();
}
}
@Override
public ListenableFuture<Response> send(String key, Request request) {
if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
}
UUID requestId = UUID.randomUUID();
request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
requestTemplate.send(request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("[{}] Request sent: {}", requestId, metadata);
}
@Override
public void onFailure(Throwable t) {
pendingRequests.remove(requestId);
future.setException(t);
}
});
return future;
}
private static class ResponseMetaData<T> {
private final long expTime;
private final SettableFuture<T> future;
ResponseMetaData(long ts, SettableFuture<T> future) {
this.expTime = ts;
this.future = future;
}
}
}

View File

@ -0,0 +1,137 @@
package org.thingsboard.server.common;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.InterruptException;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueHandler;
import org.thingsboard.server.TbQueueMsg;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueResponseTemplate;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response extends TbQueueMsg> extends AbstractTbQueueTemplate
implements TbQueueResponseTemplate<Request, Response> {
private final TbQueueConsumer<Request> requestTemplate;
private final TbQueueProducer<Response> responseTemplate;
private final ConcurrentMap<UUID, String> pendingRequests;
private final ExecutorService loopExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final ExecutorService callbackExecutor;
private final int maxPendingRequests;
private final long requestTimeout;
private final long pollInterval;
private volatile boolean stopped = false;
private final AtomicInteger pendingRequestCount = new AtomicInteger();
@Builder
public DefaultTbQueueResponseTemplate(TbQueueConsumer<Request> requestTemplate,
TbQueueProducer<Response> responseTemplate,
TbQueueHandler<Request, Response> handler,
long pollInterval,
long requestTimeout,
int maxPendingRequests,
ExecutorService executor) {
this.requestTemplate = requestTemplate;
this.responseTemplate = responseTemplate;
this.pendingRequests = new ConcurrentHashMap<>();
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
this.requestTimeout = requestTimeout;
this.callbackExecutor = executor;
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
this.loopExecutor = Executors.newSingleThreadExecutor();
}
@Override
public void init(TbQueueHandler<Request, Response> handler) {
this.responseTemplate.init();
requestTemplate.subscribe();
loopExecutor.submit(() -> {
while (!stopped) {
try {
while (pendingRequestCount.get() >= maxPendingRequests) {
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e) {
log.trace("Failed to wait until the server has capacity to handle new requests", e);
}
}
List<Request> requests = requestTemplate.poll(pollInterval);
requests.forEach(request -> {
byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header", request);
return;
}
byte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);
if (responseTopicHeader == null) {
log.error("[{}] Missing response topic in header", request);
return;
}
UUID requestId = bytesToUuid(requestIdHeader);
String responseTopic = bytesToString(responseTopicHeader);
try {
pendingRequestCount.getAndIncrement();
AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
response -> {
pendingRequestCount.decrementAndGet();
response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
responseTemplate.send(responseTopic, response, null);
},
e -> {
pendingRequestCount.decrementAndGet();
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
log.warn("[{}] Timeout to process the request: {}", requestId, request, e);
} else {
log.trace("[{}] Failed to process the request: {}", requestId, request, e);
}
},
requestTimeout,
timeoutExecutor,
callbackExecutor);
} catch (Throwable e) {
pendingRequestCount.decrementAndGet();
log.warn("[{}] Failed to process the request: {}", requestId, request, e);
}
});
requestTemplate.commit();
} catch (InterruptException ie) {
if (!stopped) {
log.warn("Fetching data from queue was interrupted.", ie);
}
} catch (Throwable e) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollInterval);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
});
}
public void stop() {
stopped = true;
if (timeoutExecutor != null) {
timeoutExecutor.shutdownNow();
}
if (loopExecutor != null) {
loopExecutor.shutdownNow();
}
}
}