diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 4043b519da..0a47f5f375 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -282,7 +282,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer if (sessionSubscriptions != null) { TbSubscription subscription = sessionSubscriptions.remove(subscriptionId); if (subscription != null) { - if (sessionSubscriptions.isEmpty()) { subscriptionsBySessionId.remove(sessionId); } @@ -304,22 +303,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer @Override public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) { log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId); - List results = new ArrayList<>(); Lock subsLock = getSubsLock(tenantId); subsLock.lock(); try { Map> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); if (sessionSubscriptions != null) { - for (TbSubscription subscription : sessionSubscriptions.values()) { - results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false)); - } + Map>> entitySubscriptions = + sessionSubscriptions.values().stream().collect(Collectors.groupingBy(TbSubscription::getEntityId)); + + entitySubscriptions.forEach((entityId, subscriptions) -> { + TbEntitySubEvent event = removeAllSubscriptions(tenantId, entityId, subscriptions); + if (event != null) { + pushSubscriptionsEvent(tenantId, entityId, event); + } + }); } else { log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId); } } finally { subsLock.unlock(); } - results.stream().filter(SubscriptionModificationResult::hasEvent).forEach(this::pushSubscriptionEvent); } @Override @@ -500,6 +503,30 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer return new SubscriptionModificationResult(tenantId, entityId, subscription, missedUpdatesCandidate, event); } + private TbEntitySubEvent removeAllSubscriptions(TenantId tenantId, EntityId entityId, List> subscriptions) { + TbEntitySubEvent event = null; + try { + TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId.getId()); + event = entitySubs.removeAll(subscriptions); + if (entitySubs.isEmpty()) { + subscriptionsByEntityId.remove(entityId.getId()); + entityUpdates.remove(entityId.getId()); + } + } catch (Exception e) { + log.warn("[{}][{}] Failed to remove all subscriptions {} due to ", tenantId, entityId, subscriptions, e); + } + return event; + } + + private void pushSubscriptionsEvent(TenantId tenantId, EntityId entityId, TbEntitySubEvent event) { + try { + log.trace("[{}][{}] Event: {}", tenantId, entityId, event); + pushSubEventToManagerService(tenantId, entityId, event); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push subscription event {} due to ", tenantId, entityId, event, e); + } + } + private void pushSubscriptionEvent(SubscriptionModificationResult modificationResult) { try { TbEntitySubEvent event = modificationResult.getEvent(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java index f5a5639ec0..ee20843538 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfo.java @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -129,13 +130,64 @@ public class TbEntityLocalSubsInfo { if (!subs.remove(sub)) { return null; } - if (subs.isEmpty()) { + if (isEmpty()) { return toEvent(ComponentLifecycleEvent.DELETED); } - TbSubscriptionsInfo oldState = state.copy(); - TbSubscriptionsInfo newState = new TbSubscriptionsInfo(); + TbSubscriptionType type = sub.getType(); + TbSubscriptionsInfo newState = state.copy(); + clearState(newState, type); + return updateState(Set.of(type), newState); + } + + public TbEntitySubEvent removeAll(List> subsToRemove) { + Set changedTypes = new HashSet<>(); + TbSubscriptionsInfo newState = state.copy(); + for (TbSubscription sub : subsToRemove) { + log.trace("[{}][{}][{}] Removing: {}", tenantId, entityId, sub.getSubscriptionId(), sub); + if (!subs.remove(sub)) { + continue; + } + if (isEmpty()) { + return toEvent(ComponentLifecycleEvent.DELETED); + } + TbSubscriptionType type = sub.getType(); + if (changedTypes.contains(type)) { + continue; + } + + clearState(newState, type); + changedTypes.add(type); + } + + return updateState(changedTypes, newState); + } + + private void clearState(TbSubscriptionsInfo state, TbSubscriptionType type) { + switch (type) { + case NOTIFICATIONS: + case NOTIFICATIONS_COUNT: + state.notifications = false; + break; + case ALARMS: + state.alarms = false; + break; + case ATTRIBUTES: + state.attrAllKeys = false; + state.attrKeys = null; + break; + case TIMESERIES: + state.tsAllKeys = false; + state.tsKeys = null; + } + } + + private TbEntitySubEvent updateState(Set updatedTypes, TbSubscriptionsInfo newState) { for (TbSubscription subscription : subs) { - switch (subscription.getType()) { + TbSubscriptionType type = subscription.getType(); + if (!updatedTypes.contains(type)) { + continue; + } + switch (type) { case NOTIFICATIONS: case NOTIFICATIONS_COUNT: if (!newState.notifications) { @@ -173,7 +225,7 @@ public class TbEntityLocalSubsInfo { break; } } - if (newState.equals(oldState)) { + if (newState.equals(state)) { return null; } else { this.state = newState; @@ -196,7 +248,7 @@ public class TbEntityLocalSubsInfo { public boolean isEmpty() { - return state.isEmpty(); + return subs.isEmpty(); } public TbSubscription registerPendingSubscription(TbSubscription subscription, TbEntitySubEvent event) { diff --git a/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfoTest.java b/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfoTest.java new file mode 100644 index 0000000000..e9b95ec832 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/subscription/TbEntityLocalSubsInfoTest.java @@ -0,0 +1,186 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import org.junit.jupiter.api.Test; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TbEntityLocalSubsInfoTest { + + @Test + public void addTest() { + Set expectedSubs = new HashSet<>(); + TbEntityLocalSubsInfo subsInfo = createSubsInfo(); + TenantId tenantId = subsInfo.getTenantId(); + EntityId entityId = subsInfo.getEntityId(); + TbAttributeSubscription attrSubscription1 = TbAttributeSubscription.builder() + .sessionId("session1") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key1", 1L, "key2", 2L)) + .build(); + expectedSubs.add(attrSubscription1); + TbEntitySubEvent created = subsInfo.add(attrSubscription1); + assertFalse(subsInfo.isEmpty()); + assertNotNull(created); + assertEquals(expectedSubs, subsInfo.getSubs()); + checkEvent(created, expectedSubs, ComponentLifecycleEvent.CREATED); + + assertNull(subsInfo.add(attrSubscription1)); + + TbAttributeSubscription attrSubscription2 = TbAttributeSubscription.builder() + .sessionId("session2") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key3", 3L, "key4", 4L)) + .build(); + expectedSubs.add(attrSubscription2); + TbEntitySubEvent updated = subsInfo.add(attrSubscription2); + assertNotNull(updated); + + assertEquals(expectedSubs, subsInfo.getSubs()); + checkEvent(updated, expectedSubs, ComponentLifecycleEvent.UPDATED); + } + + @Test + public void removeTest() { + Set expectedSubs = new HashSet<>(); + TbEntityLocalSubsInfo subsInfo = createSubsInfo(); + TenantId tenantId = subsInfo.getTenantId(); + EntityId entityId = subsInfo.getEntityId(); + TbAttributeSubscription attrSubscription1 = TbAttributeSubscription.builder() + .sessionId("session1") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key1", 1L, "key2", 2L)) + .build(); + + TbAttributeSubscription attrSubscription2 = TbAttributeSubscription.builder() + .sessionId("session2") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key3", 3L, "key4", 4L)) + .build(); + + expectedSubs.add(attrSubscription1); + expectedSubs.add(attrSubscription2); + + subsInfo.add(attrSubscription1); + subsInfo.add(attrSubscription2); + + assertEquals(expectedSubs, subsInfo.getSubs()); + + TbEntitySubEvent updatedEvent = subsInfo.remove(attrSubscription1); + expectedSubs.remove(attrSubscription1); + assertNotNull(updatedEvent); + assertEquals(expectedSubs, subsInfo.getSubs()); + checkEvent(updatedEvent, expectedSubs, ComponentLifecycleEvent.UPDATED); + + TbEntitySubEvent deletedEvent = subsInfo.remove(attrSubscription2); + expectedSubs.remove(attrSubscription2); + assertNotNull(deletedEvent); + assertEquals(expectedSubs, subsInfo.getSubs()); + checkEvent(deletedEvent, expectedSubs, ComponentLifecycleEvent.DELETED); + + assertTrue(subsInfo.isEmpty()); + } + + @Test + public void removeAllTest() { + TbEntityLocalSubsInfo subsInfo = createSubsInfo(); + TenantId tenantId = subsInfo.getTenantId(); + EntityId entityId = subsInfo.getEntityId(); + TbAttributeSubscription attrSubscription1 = TbAttributeSubscription.builder() + .sessionId("session1") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key1", 1L, "key2", 2L)) + .build(); + + TbAttributeSubscription attrSubscription2 = TbAttributeSubscription.builder() + .sessionId("session2") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key3", 3L, "key4", 4L)) + .build(); + + TbAttributeSubscription attrSubscription3 = TbAttributeSubscription.builder() + .sessionId("session3") + .tenantId(tenantId) + .entityId(entityId) + .keyStates(Map.of("key5", 5L, "key6", 6L)) + .build(); + + subsInfo.add(attrSubscription1); + subsInfo.add(attrSubscription2); + subsInfo.add(attrSubscription3); + + assertFalse(subsInfo.isEmpty()); + + TbEntitySubEvent updatedEvent = subsInfo.removeAll(List.of(attrSubscription1, attrSubscription2)); + assertNotNull(updatedEvent); + checkEvent(updatedEvent, Set.of(attrSubscription3), ComponentLifecycleEvent.UPDATED); + + assertFalse(subsInfo.isEmpty()); + + TbEntitySubEvent deletedEvent = subsInfo.removeAll(List.of(attrSubscription3)); + assertNotNull(deletedEvent); + checkEvent(deletedEvent, null, ComponentLifecycleEvent.DELETED); + + assertTrue(subsInfo.isEmpty()); + } + + private TbEntityLocalSubsInfo createSubsInfo() { + return new TbEntityLocalSubsInfo(new TenantId(UUID.randomUUID()), new DeviceId(UUID.randomUUID())); + } + + private void checkEvent(TbEntitySubEvent event, Set expectedSubs, ComponentLifecycleEvent expectedType) { + assertEquals(expectedType, event.getType()); + TbSubscriptionsInfo info = event.getInfo(); + if (event.getType() == ComponentLifecycleEvent.DELETED) { + assertNull(info); + return; + } + assertNotNull(info); + assertFalse(info.notifications); + assertFalse(info.alarms); + assertFalse(info.attrAllKeys); + assertFalse(info.tsAllKeys); + assertNull(info.tsKeys); + assertEquals(getAttrKeys(expectedSubs), info.attrKeys); + } + + private Set getAttrKeys(Set attributeSubscriptions) { + return attributeSubscriptions.stream().map(s -> s.getKeyStates().keySet()).flatMap(Collection::stream).collect(Collectors.toSet()); + } +}