reportActivity: add error to callback

This commit is contained in:
nick 2024-10-17 16:46:16 +03:00
parent 104ad0deb9
commit 12c35d903a

View File

@ -1101,19 +1101,24 @@ public class DefaultTransportService extends TransportActivityManager implements
}
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, TransportServiceCallback<Void> callback) {
TopicPartitionInfo tpi;
try {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg);
}
TransportTbQueueCallback transportTbQueueCallback = callback != null ?
new TransportTbQueueCallback(callback) : null;
tbCoreProducerStats.incrementTotal();
StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback);
tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
} catch (TenantNotFoundException e) {
log.trace("Failed to send message to core. Tenant with ID [{}] not found in the database. Message delivery aborted.", tenantId, e);
tpi = TopicPartitionInfo.builder().topic(e.getMessage()).build();
if (callback != null) {
callback.onError(e);
}
}
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg);
}
TransportTbQueueCallback transportTbQueueCallback = callback != null ?
new TransportTbQueueCallback(callback) : null;
tbCoreProducerStats.incrementTotal();
StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback);
}
private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json,