diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 43db4b7b23..c337a285e7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1101,19 +1101,24 @@ public class DefaultTransportService extends TransportActivityManager implements } private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, TransportServiceCallback callback) { + TopicPartitionInfo tpi; try { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); - if (log.isTraceEnabled()) { - log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); - } - TransportTbQueueCallback transportTbQueueCallback = callback != null ? - new TransportTbQueueCallback(callback) : null; - tbCoreProducerStats.incrementTotal(); - StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); + tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); } catch (TenantNotFoundException e) { log.trace("Failed to send message to core. Tenant with ID [{}] not found in the database. Message delivery aborted.", tenantId, e); + tpi = TopicPartitionInfo.builder().topic(e.getMessage()).build(); + if (callback != null) { + callback.onError(e); + } } + if (log.isTraceEnabled()) { + log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); + } + TransportTbQueueCallback transportTbQueueCallback = callback != null ? + new TransportTbQueueCallback(callback) : null; + tbCoreProducerStats.incrementTotal(); + StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); } private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,