diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java index 9bcd8fc373..d996a1dbc9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java @@ -507,7 +507,7 @@ public class EdgeMsgConstructorUtils { JsonObject data = entityData.getAsJsonObject(); builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data.getAsJsonObject("data"), ts)); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to telemetry proto, entityData [{}]", tenantId, entityId, entityData, e); } break; case ATTRIBUTES_UPDATED: @@ -522,7 +522,7 @@ public class EdgeMsgConstructorUtils { builder.setPostAttributeScope(getScopeOfDefault(data)); builder.setAttributeTs(ts); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } break; case POST_ATTRIBUTES: @@ -533,7 +533,7 @@ public class EdgeMsgConstructorUtils { builder.setPostAttributeScope(getScopeOfDefault(data)); builder.setAttributeTs(ts); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to PostAttributesMsg, entityData [{}]", tenantId, entityId, entityData, e); } break; case ATTRIBUTES_DELETED: @@ -546,7 +546,7 @@ public class EdgeMsgConstructorUtils { attributeDeleteMsg.build(); builder.setAttributeDeleteMsg(attributeDeleteMsg); } catch (Exception e) { - log.warn("[{}][{}] Can't convert to AttributeDeleteMsg proto, entityData [{}]", tenantId, entityId, entityData, e); + log.trace("[{}][{}] Can't convert to AttributeDeleteMsg proto, entityData [{}]", tenantId, entityId, entityData, e); } break; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index ecd0c6d9f3..9ce5973639 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -203,7 +203,7 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onError(Throwable t) { - log.error("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t); + log.trace("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t); closeSession(); } @@ -255,7 +255,7 @@ public abstract class EdgeGrpcSession implements Closeable { private void doSync(EdgeSyncCursor cursor) { if (cursor.hasNext()) { EdgeEventFetcher next = cursor.getNext(); - log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}", + log.debug("[{}][{}] starting sync process, cursor current idx = {}, class = {}", tenantId, edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName()); ListenableFuture> future = startProcessingEdgeEvents(next); Futures.addCallback(future, new FutureCallback<>() { @@ -651,7 +651,7 @@ public abstract class EdgeGrpcSession implements Closeable { default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); } } catch (Exception e) { - log.error("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); + log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); } if (downlinkMsg != null) { result.add(downlinkMsg); @@ -763,7 +763,7 @@ public abstract class EdgeGrpcSession implements Closeable { try { outputStream.onNext(responseMsg); } catch (Exception e) { - log.error("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); + log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); connected = false; sessionCloseListener.accept(edge, sessionId); } finally { @@ -909,7 +909,7 @@ public abstract class EdgeGrpcSession implements Closeable { } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); - log.error("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); + log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build()); return Futures.immediateFailedFuture(e); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ecd4ddebb7..b4d178a737 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1590,11 +1590,24 @@ queue: # tb_rule_engine.sq: # - key: max.poll.records # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" - tb_edge_event.notifications: - # Example of specific consumer properties value per topic for edge event + tb_edge: + # Properties for consumers targeting edge service update topics. - key: max.poll.records - # Example of specific consumer properties value per topic for edge event - value: "${TB_QUEUE_KAFKA_EDGE_EVENT_MAX_POLL_RECORDS:50}" + # Define the maximum number of records that can be polled from tb_edge topics per request. + value: "${TB_QUEUE_KAFKA_EDGE_EVENTS_MAX_POLL_RECORDS:10}" + tb_edge.notifications: + # Properties for consumers targeting high-priority edge notifications. + # These notifications include RPC calls, lifecycle events, and new queue messages, + # requiring minimal latency and swift processing. + - key: max.poll.records + # Define the maximum number of records that can be polled from tb_edge.notifications. topics. + value: "${TB_QUEUE_KAFKA_EDGE_HP_EVENTS_MAX_POLL_RECORDS:10}" + tb_edge_event.notifications: + # Properties for consumers targeting downlinks meant for specific edge topics. + # Topic names are dynamically constructed using tenant and edge identifiers. + - key: max.poll.records + # Define the maximum number of records that can be polled from tb_edge_event.notifications.. topics. + value: "${TB_QUEUE_KAFKA_EDGE_NOTIFICATIONS_MAX_POLL_RECORDS:10}" tb_housekeeper: # Consumer properties for Housekeeper tasks topic - key: max.poll.records @@ -1841,11 +1854,13 @@ queue: # Interval in milliseconds to poll messages poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}" edge: - # Default topic name + # Topic name to notify edge service on entity updates, assignment, etc. topic: "${TB_QUEUE_EDGE_TOPIC:tb_edge}" - # For high-priority notifications that require minimum latency and processing time + # Topic prefix for high-priority edge notifications (rpc, lifecycle, new messages in queue) that require minimum latency and processing time. + # Each tb-core has its own topic: . notifications_topic: "${TB_QUEUE_EDGE_NOTIFICATIONS_TOPIC:tb_edge.notifications}" - # For edge events messages + # Topic prefix for downlinks to be pushed to specific edge. + # Every edge has its own unique topic: .. event_notifications_topic: "${TB_QUEUE_EDGE_EVENT_NOTIFICATIONS_TOPIC:tb_edge_event.notifications}" # Amount of partitions used by Edge services partitions: "${TB_QUEUE_EDGE_PARTITIONS:10}" diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index 42263138c1..7271e1cb10 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -38,7 +38,7 @@ public class TopicService { @Value("${queue.rule-engine.notifications-topic:tb_rule_engine.notifications}") private String tbRuleEngineNotificationsTopic; - @Value("${queue.transport.notifications-topics:tb_transport.notifications}") + @Value("${queue.transport.notifications-topic:tb_transport.notifications}") private String tbTransportNotificationsTopic; @Value("${queue.edge.notifications-topic:tb_edge.notifications}") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 181d422f16..82b5af179f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -50,8 +50,6 @@ import java.util.Properties; @Component public class TbKafkaSettings { - private static final List DYNAMIC_TOPICS = List.of("tb_edge_event.notifications"); - @Value("${queue.kafka.bootstrap.servers}") private String servers; @@ -163,18 +161,20 @@ public class TbKafkaSettings { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerPropertiesPerTopic - .getOrDefault(topic, Collections.emptyList()) - .forEach(kv -> props.put(kv.getKey(), kv.getValue())); - if (topic != null) { - DYNAMIC_TOPICS.stream() - .filter(topic::startsWith) - .findFirst() - .ifPresent(prefix -> consumerPropertiesPerTopic.getOrDefault(prefix, Collections.emptyList()) - .forEach(kv -> props.put(kv.getKey(), kv.getValue()))); + List properties = consumerPropertiesPerTopic.get(topic); + if (properties == null) { + for (Map.Entry> entry : consumerPropertiesPerTopic.entrySet()) { + if (topic.startsWith(entry.getKey())) { + properties = entry.getValue(); + break; + } + } + } + if (properties != null) { + properties.forEach(kv -> props.put(kv.getKey(), kv.getValue())); + } } - return props; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java index 7c3e415e9f..98a3d78304 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java @@ -115,7 +115,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { return toHousekeeper; } - @Override public TbQueueProducer> getTbEdgeMsgProducer() { return toEdge;