Handle TenantNotFoundException for stale subscriptions (#10973)
* Handle TenantNotFoundException for stale subscriptions * Minor refactoring for stale subs cleanup
This commit is contained in:
		
							parent
							
								
									159c97ac67
								
							
						
					
					
						commit
						93e62dd1e8
					
				@ -15,6 +15,8 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.subscription;
 | 
			
		||||
 | 
			
		||||
import jakarta.annotation.PostConstruct;
 | 
			
		||||
import jakarta.annotation.PreDestroy;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.context.event.EventListener;
 | 
			
		||||
@ -24,9 +26,9 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.AttributeScope;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.Aggregation;
 | 
			
		||||
@ -51,8 +53,6 @@ import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscript
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
 | 
			
		||||
 | 
			
		||||
import jakarta.annotation.PostConstruct;
 | 
			
		||||
import jakarta.annotation.PreDestroy;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -143,7 +143,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
 | 
			
		||||
             * Even if we cache locally the list of active subscriptions by entity id, it is still time-consuming operation to get them from cache
 | 
			
		||||
             * Since number of subscriptions is usually much less than number of devices that are pushing data.
 | 
			
		||||
             */
 | 
			
		||||
            subscriptionsByEntityId.values().forEach(sub -> pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED)));
 | 
			
		||||
            Set<UUID> staleSubs = new HashSet<>();
 | 
			
		||||
            subscriptionsByEntityId.forEach((id, sub) -> {
 | 
			
		||||
                try {
 | 
			
		||||
                    pushSubEventToManagerService(sub.getTenantId(), sub.getEntityId(), sub.toEvent(ComponentLifecycleEvent.UPDATED));
 | 
			
		||||
                } catch (TenantNotFoundException e) {
 | 
			
		||||
                    staleSubs.add(id);
 | 
			
		||||
                    log.warn("Cleaning up stale subscription {} for tenant {} due to TenantNotFoundException", id, sub.getTenantId());
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    log.error("Failed to push subscription {} to manager service", sub, e);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            if (!staleSubs.isEmpty()) {
 | 
			
		||||
                staleSubs.forEach(subscriptionsByEntityId::remove);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user