EDQS: human readable response on failed to send kafka msg

This commit is contained in:
Andrii Landiak 2025-06-18 16:42:45 +03:00
parent 6314cb7ee5
commit a843c86227
3 changed files with 49 additions and 5 deletions

View File

@ -17,11 +17,12 @@ package org.thingsboard.server.queue;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Created by ashvayka on 05.10.18.
*/
public interface TbQueueHandler<Request extends TbQueueMsg, Response extends TbQueueMsg> {
ListenableFuture<Response> handle(Request request);
default Response constructErrorResponseMsg(Request request, Throwable cause) {
return null;
}
}

View File

@ -203,6 +203,25 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
});
}
@Override
public TbProtoQueueMsg<FromEdqsMsg> constructErrorResponseMsg(TbProtoQueueMsg<ToEdqsMsg> request, Throwable e) {
EdqsResponse response = new EdqsResponse();
String errorMessage;
if (e instanceof org.apache.kafka.common.errors.RecordTooLargeException) {
errorMessage = "Result set is too large";
} else if (e instanceof IllegalArgumentException || e instanceof NullPointerException) {
errorMessage = "Invalid request format or missing data: " + ExceptionUtil.getMessage(e);
} else {
errorMessage = ExceptionUtil.getMessage(e);
}
response.setError(errorMessage);
return new TbProtoQueueMsg<>(request.getKey(), FromEdqsMsg.newBuilder()
.setResponseMsg(TransportProtos.EdqsResponseMsg.newBuilder()
.setValue(JacksonUtil.toString(response))
.build())
.build(), request.getHeaders());
}
private EdqsResponse processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
EdqsResponse response = new EdqsResponse();
try {

View File

@ -21,9 +21,11 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
@ -119,8 +121,20 @@ public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Respon
response -> {
pendingRequestCount.decrementAndGet();
response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
responseProducer.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
stats.incrementSuccessful();
TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(responseTopic).build();
responseProducer.send(tpi, response, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
stats.incrementSuccessful();
}
@Override
public void onFailure(Throwable t) {
log.error("[{}] Failed to send response {}", requestId, response, t);
sendErrorResponse(requestId, responseTopic, request, t);
stats.incrementFailed();
}
});
},
e -> {
pendingRequestCount.decrementAndGet();
@ -144,6 +158,16 @@ public class PartitionedQueueResponseTemplate<Request extends TbQueueMsg, Respon
consumer.commit();
}
private void sendErrorResponse(UUID requestId, String responseTopic, Request request, Throwable cause) {
Response errorResponseMsg = handler.constructErrorResponseMsg(request, cause);
if (errorResponseMsg != null) {
errorResponseMsg.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(responseTopic).build();
responseProducer.send(tpi, errorResponseMsg, null);
}
}
public void subscribe(Set<TopicPartitionInfo> partitions) {
requestConsumer.update(partitions);
}