From d045ef23e23bbd287d1748eac8e1cef1be36a7dd Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 14 Apr 2025 11:41:24 +0300 Subject: [PATCH] Edge - slow down processing of edge events for SYS_TENANT_ID --- .../edge/EdgeEventSourcingListener.java | 5 +++++ .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../edge/rpc/processor/BaseEdgeProcessor.java | 22 +++++++++++++------ 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index c62a551310..8ec42b4a10 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -167,6 +167,11 @@ public class EdgeEventSourcingListener { @TransactionalEventListener(fallbackExecution = true) public void handleEvent(RelationActionEvent event) { try { + TenantId tenantId = event.getTenantId(); + if (!tenantId.isSysTenantId() && !tenantService.tenantExists(tenantId)) { + log.debug("[{}] Ignoring RelationActionEvent because tenant does not exist: {}", tenantId, event); + return; + } EntityRelation relation = event.getRelation(); if (relation == null) { log.trace("[{}] skipping RelationActionEvent event in case relation is null: {}", event.getTenantId(), event); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 9ce5973639..7861885989 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -528,7 +528,7 @@ public abstract class EdgeGrpcSession implements Closeable { sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg); } else { - log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); + log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId()); // if NOT timeseries or attributes failures - ack failed downlink if (downlinkMsg.getEntityDataCount() == 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9f30d70fb2..7ce1159397 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -142,17 +143,24 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { protected ListenableFuture processActionForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, - JsonNode body, EdgeId sourceEdgeId) { - List> futures = new ArrayList<>(); + EdgeId sourceEdgeId) { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { - PageDataIterable tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 1024); + PageDataIterable tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 500); for (TenantId tenantId1 : tenantIds) { - futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId)); + try { + List> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null, sourceEdgeId); + for (ListenableFuture future : sysTenantFutures) { + future.get(10, TimeUnit.SECONDS); + } + } catch (Exception e) { + log.error("Failed to process action for all edges by SYS_TENANT_ID. Failed tenantId = [{}]", tenantId1, e); + } } + return Futures.immediateFuture(null); } else { - futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); + List> tenantFutures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId); + return Futures.transform(Futures.allAsList(tenantFutures), voids -> null, dbCallbackExecutorService); } - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } private List> processActionForAllEdgesByTenantId(TenantId tenantId, @@ -284,7 +292,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { private ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { return switch (actionType) { case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity - processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); + processActionForAllEdges(tenantId, type, actionType, entityId, sourceEdgeId); default -> Futures.immediateFuture(null); }; }