From a843c86227b1b0daec226cbce7bc0d2f3998aa6d Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 18 Jun 2025 16:42:45 +0300 Subject: [PATCH] EDQS: human readable response on failed to send kafka msg --- .../server/queue/TbQueueHandler.java | 7 +++-- .../server/edqs/processor/EdqsProcessor.java | 19 +++++++++++++ .../PartitionedQueueResponseTemplate.java | 28 +++++++++++++++++-- 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueHandler.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueHandler.java index bb5bc1d668..523a97e13c 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueHandler.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueHandler.java @@ -17,11 +17,12 @@ package org.thingsboard.server.queue; import com.google.common.util.concurrent.ListenableFuture; -/** - * Created by ashvayka on 05.10.18. - */ public interface TbQueueHandler { ListenableFuture handle(Request request); + default Response constructErrorResponseMsg(Request request, Throwable cause) { + return null; + } + } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 0e74cb98fa..13a54973e6 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -203,6 +203,25 @@ public class EdqsProcessor implements TbQueueHandler, }); } + @Override + public TbProtoQueueMsg constructErrorResponseMsg(TbProtoQueueMsg request, Throwable e) { + EdqsResponse response = new EdqsResponse(); + String errorMessage; + if (e instanceof org.apache.kafka.common.errors.RecordTooLargeException) { + errorMessage = "Result set is too large"; + } else if (e instanceof IllegalArgumentException || e instanceof NullPointerException) { + errorMessage = "Invalid request format or missing data: " + ExceptionUtil.getMessage(e); + } else { + errorMessage = ExceptionUtil.getMessage(e); + } + response.setError(errorMessage); + return new TbProtoQueueMsg<>(request.getKey(), FromEdqsMsg.newBuilder() + .setResponseMsg(TransportProtos.EdqsResponseMsg.newBuilder() + .setValue(JacksonUtil.toString(response)) + .build()) + .build(), request.getHeaders()); + } + private EdqsResponse processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) { EdqsResponse response = new EdqsResponse(); try { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java index 7e913009f0..a15fafa19a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java @@ -21,9 +21,11 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.stats.MessagesStats; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueHandler; import org.thingsboard.server.queue.TbQueueMsg; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; @@ -119,8 +121,20 @@ public class PartitionedQueueResponseTemplate { pendingRequestCount.decrementAndGet(); response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId)); - responseProducer.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null); - stats.incrementSuccessful(); + TopicPartitionInfo tpi = TopicPartitionInfo.builder().topic(responseTopic).build(); + responseProducer.send(tpi, response, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + stats.incrementSuccessful(); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Failed to send response {}", requestId, response, t); + sendErrorResponse(requestId, responseTopic, request, t); + stats.incrementFailed(); + } + }); }, e -> { pendingRequestCount.decrementAndGet(); @@ -144,6 +158,16 @@ public class PartitionedQueueResponseTemplate partitions) { requestConsumer.update(partitions); }