From 104ad0deb9c79f392e1c4234aca7983202aa21f3 Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 17 Oct 2024 13:57:50 +0300 Subject: [PATCH 1/5] activity: Failed to send message to core --- .../service/DefaultTransportService.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 8e48061db6..43db4b7b23 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.device.data.PowerMode; +import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -1100,16 +1101,19 @@ public class DefaultTransportService extends TransportActivityManager implements } private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, TransportServiceCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); - if (log.isTraceEnabled()) { - log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); + try { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (log.isTraceEnabled()) { + log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); + } + TransportTbQueueCallback transportTbQueueCallback = callback != null ? + new TransportTbQueueCallback(callback) : null; + tbCoreProducerStats.incrementTotal(); + StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); + } catch (TenantNotFoundException e) { + log.trace("Failed to send message to core. Tenant with ID [{}] not found in the database. Message delivery aborted.", tenantId, e); } - - TransportTbQueueCallback transportTbQueueCallback = callback != null ? - new TransportTbQueueCallback(callback) : null; - tbCoreProducerStats.incrementTotal(); - StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); } private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, From 12c35d903ad42837092ae459374e01a3521ee568 Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 17 Oct 2024 16:46:16 +0300 Subject: [PATCH 2/5] reportActivity: add error to callback --- .../service/DefaultTransportService.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 43db4b7b23..c337a285e7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1101,19 +1101,24 @@ public class DefaultTransportService extends TransportActivityManager implements } private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, TransportServiceCallback callback) { + TopicPartitionInfo tpi; try { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); - if (log.isTraceEnabled()) { - log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); - } - TransportTbQueueCallback transportTbQueueCallback = callback != null ? - new TransportTbQueueCallback(callback) : null; - tbCoreProducerStats.incrementTotal(); - StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); + tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); } catch (TenantNotFoundException e) { log.trace("Failed to send message to core. Tenant with ID [{}] not found in the database. Message delivery aborted.", tenantId, e); + tpi = TopicPartitionInfo.builder().topic(e.getMessage()).build(); + if (callback != null) { + callback.onError(e); + } } + if (log.isTraceEnabled()) { + log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); + } + TransportTbQueueCallback transportTbQueueCallback = callback != null ? + new TransportTbQueueCallback(callback) : null; + tbCoreProducerStats.incrementTotal(); + StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); } private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, From 2d4a4ad9916d18b84cb01b0c83d3c2d3cd49feb9 Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 17 Oct 2024 17:18:52 +0300 Subject: [PATCH 3/5] reportActivity: add topic empty --- .../common/transport/service/DefaultTransportService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index c337a285e7..edb0e5c44a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1106,7 +1106,7 @@ public class DefaultTransportService extends TransportActivityManager implements tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); } catch (TenantNotFoundException e) { log.trace("Failed to send message to core. Tenant with ID [{}] not found in the database. Message delivery aborted.", tenantId, e); - tpi = TopicPartitionInfo.builder().topic(e.getMessage()).build(); + tpi = TopicPartitionInfo.builder().topic("").build(); if (callback != null) { callback.onError(e); } From d77e557945040180f9a65262c0311a7e6ea69df8 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 18 Oct 2024 14:32:26 +0300 Subject: [PATCH 4/5] activity: add return if callback error --- .../common/transport/service/DefaultTransportService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index edb0e5c44a..15073ba841 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1104,12 +1104,12 @@ public class DefaultTransportService extends TransportActivityManager implements TopicPartitionInfo tpi; try { tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); - } catch (TenantNotFoundException e) { - log.trace("Failed to send message to core. Tenant with ID [{}] not found in the database. Message delivery aborted.", tenantId, e); - tpi = TopicPartitionInfo.builder().topic("").build(); + } catch (Exception e) { + log.trace("Failed to send message to core. Tenant with ID [{}], entityType [{}], entityId [{}], routingKey [{}], \nmsg [{}].\n Message delivery aborted.", tenantId, entityId.getEntityType(), entityId.toString(), routingKey, msg, e); if (callback != null) { callback.onError(e); } + return; } if (log.isTraceEnabled()) { log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); From 737962ae075743d6696a10627ea70ed75116801a Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 24 Oct 2024 13:17:12 +0300 Subject: [PATCH 5/5] Activity: comments1 --- .../common/transport/service/DefaultTransportService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 15073ba841..44f61270b4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1105,7 +1105,7 @@ public class DefaultTransportService extends TransportActivityManager implements try { tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); } catch (Exception e) { - log.trace("Failed to send message to core. Tenant with ID [{}], entityType [{}], entityId [{}], routingKey [{}], \nmsg [{}].\n Message delivery aborted.", tenantId, entityId.getEntityType(), entityId.toString(), routingKey, msg, e); + log.warn("Failed to send message to core. Tenant with ID [{}], routingKey [{}], msg [{}]. Message delivery aborted.", tenantId, routingKey, msg, e); if (callback != null) { callback.onError(e); }