From 93e62dd1e84c9c63fc496ea2f8b137695a5ff1d2 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Fri, 14 Jun 2024 18:13:05 +0300 Subject: [PATCH] Handle TenantNotFoundException for stale subscriptions (#10973) * Handle TenantNotFoundException for stale subscriptions * Minor refactoring for stale subs cleanup --- .../DefaultTbLocalSubscriptionService.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 8abb78d9c1..100636ad5d 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.service.subscription; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; @@ -24,9 +26,9 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.AlarmInfo; +import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; @@ -51,8 +53,6 @@ import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscript import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -143,7 +143,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer * Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache * Since number of subscriptions is usually much less than number of devices that are pushing data. */ - subscriptionsByEntityId.values().forEach(sub -> pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED))); + Set staleSubs = new HashSet<>(); + subscriptionsByEntityId.forEach((id, sub) -> { + try { + pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED)); + } catch (TenantNotFoundException e) { + staleSubs.add(id); + log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", id, sub.getTenantId()); + } catch (Exception e) { + log.error("Failed to push subscription {} to manager service", sub, e); + } + }); + if (!staleSubs.isEmpty()) { + staleSubs.forEach(subscriptionsByEntityId::remove); + } } }