Merge pull request #11693 from YevhenBondarenko/feature/ws-remove-all-optimization

Added ability to remove subscriptions by batch and update state once
This commit is contained in:
Andrew Shvayka 2024-10-17 17:12:18 +03:00 committed by GitHub
commit eea9caa286
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 277 additions and 12 deletions

View File

@ -282,7 +282,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
if (sessionSubscriptions != null) { if (sessionSubscriptions != null) {
TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId); TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId);
if (subscription != null) { if (subscription != null) {
if (sessionSubscriptions.isEmpty()) { if (sessionSubscriptions.isEmpty()) {
subscriptionsBySessionId.remove(sessionId); subscriptionsBySessionId.remove(sessionId);
} }
@ -304,22 +303,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
@Override @Override
public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) { public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) {
log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId); log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId);
List<SubscriptionModificationResult> results = new ArrayList<>();
Lock subsLock = getSubsLock(tenantId); Lock subsLock = getSubsLock(tenantId);
subsLock.lock(); subsLock.lock();
try { try {
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId); Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId);
if (sessionSubscriptions != null) { if (sessionSubscriptions != null) {
for (TbSubscription<?> subscription : sessionSubscriptions.values()) { Map<EntityId, List<TbSubscription<?>>> entitySubscriptions =
results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false)); sessionSubscriptions.values().stream().collect(Collectors.groupingBy(TbSubscription::getEntityId));
entitySubscriptions.forEach((entityId, subscriptions) -> {
TbEntitySubEvent event = removeAllSubscriptions(tenantId, entityId, subscriptions);
if (event != null) {
pushSubscriptionsEvent(tenantId, entityId, event);
} }
});
} else { } else {
log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId); log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
} }
} finally { } finally {
subsLock.unlock(); subsLock.unlock();
} }
results.stream().filter(SubscriptionModificationResult::hasEvent).forEach(this::pushSubscriptionEvent);
} }
@Override @Override
@ -500,6 +503,30 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
return new SubscriptionModificationResult(tenantId, entityId, subscription, missedUpdatesCandidate, event); return new SubscriptionModificationResult(tenantId, entityId, subscription, missedUpdatesCandidate, event);
} }
private TbEntitySubEvent removeAllSubscriptions(TenantId tenantId, EntityId entityId, List<TbSubscription<?>> subscriptions) {
TbEntitySubEvent event = null;
try {
TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId.getId());
event = entitySubs.removeAll(subscriptions);
if (entitySubs.isEmpty()) {
subscriptionsByEntityId.remove(entityId.getId());
entityUpdates.remove(entityId.getId());
}
} catch (Exception e) {
log.warn("[{}][{}] Failed to remove all subscriptions {} due to ", tenantId, entityId, subscriptions, e);
}
return event;
}
private void pushSubscriptionsEvent(TenantId tenantId, EntityId entityId, TbEntitySubEvent event) {
try {
log.trace("[{}][{}] Event: {}", tenantId, entityId, event);
pushSubEventToManagerService(tenantId, entityId, event);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push subscription event {} due to ", tenantId, entityId, event, e);
}
}
private void pushSubscriptionEvent(SubscriptionModificationResult modificationResult) { private void pushSubscriptionEvent(SubscriptionModificationResult modificationResult) {
try { try {
TbEntitySubEvent event = modificationResult.getEvent(); TbEntitySubEvent event = modificationResult.getEvent();

View File

@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -129,13 +130,64 @@ public class TbEntityLocalSubsInfo {
if (!subs.remove(sub)) { if (!subs.remove(sub)) {
return null; return null;
} }
if (subs.isEmpty()) { if (isEmpty()) {
return toEvent(ComponentLifecycleEvent.DELETED); return toEvent(ComponentLifecycleEvent.DELETED);
} }
TbSubscriptionsInfo oldState = state.copy(); TbSubscriptionType type = sub.getType();
TbSubscriptionsInfo newState = new TbSubscriptionsInfo(); TbSubscriptionsInfo newState = state.copy();
clearState(newState, type);
return updateState(Set.of(type), newState);
}
public TbEntitySubEvent removeAll(List<? extends TbSubscription<?>> subsToRemove) {
Set<TbSubscriptionType> changedTypes = new HashSet<>();
TbSubscriptionsInfo newState = state.copy();
for (TbSubscription<?> sub : subsToRemove) {
log.trace("[{}][{}][{}] Removing: {}", tenantId, entityId, sub.getSubscriptionId(), sub);
if (!subs.remove(sub)) {
continue;
}
if (isEmpty()) {
return toEvent(ComponentLifecycleEvent.DELETED);
}
TbSubscriptionType type = sub.getType();
if (changedTypes.contains(type)) {
continue;
}
clearState(newState, type);
changedTypes.add(type);
}
return updateState(changedTypes, newState);
}
private void clearState(TbSubscriptionsInfo state, TbSubscriptionType type) {
switch (type) {
case NOTIFICATIONS:
case NOTIFICATIONS_COUNT:
state.notifications = false;
break;
case ALARMS:
state.alarms = false;
break;
case ATTRIBUTES:
state.attrAllKeys = false;
state.attrKeys = null;
break;
case TIMESERIES:
state.tsAllKeys = false;
state.tsKeys = null;
}
}
private TbEntitySubEvent updateState(Set<TbSubscriptionType> updatedTypes, TbSubscriptionsInfo newState) {
for (TbSubscription<?> subscription : subs) { for (TbSubscription<?> subscription : subs) {
switch (subscription.getType()) { TbSubscriptionType type = subscription.getType();
if (!updatedTypes.contains(type)) {
continue;
}
switch (type) {
case NOTIFICATIONS: case NOTIFICATIONS:
case NOTIFICATIONS_COUNT: case NOTIFICATIONS_COUNT:
if (!newState.notifications) { if (!newState.notifications) {
@ -173,7 +225,7 @@ public class TbEntityLocalSubsInfo {
break; break;
} }
} }
if (newState.equals(oldState)) { if (newState.equals(state)) {
return null; return null;
} else { } else {
this.state = newState; this.state = newState;
@ -196,7 +248,7 @@ public class TbEntityLocalSubsInfo {
public boolean isEmpty() { public boolean isEmpty() {
return state.isEmpty(); return subs.isEmpty();
} }
public TbSubscription<?> registerPendingSubscription(TbSubscription<?> subscription, TbEntitySubEvent event) { public TbSubscription<?> registerPendingSubscription(TbSubscription<?> subscription, TbEntitySubEvent event) {

View File

@ -0,0 +1,186 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TbEntityLocalSubsInfoTest {
@Test
public void addTest() {
Set<TbAttributeSubscription> expectedSubs = new HashSet<>();
TbEntityLocalSubsInfo subsInfo = createSubsInfo();
TenantId tenantId = subsInfo.getTenantId();
EntityId entityId = subsInfo.getEntityId();
TbAttributeSubscription attrSubscription1 = TbAttributeSubscription.builder()
.sessionId("session1")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key1", 1L, "key2", 2L))
.build();
expectedSubs.add(attrSubscription1);
TbEntitySubEvent created = subsInfo.add(attrSubscription1);
assertFalse(subsInfo.isEmpty());
assertNotNull(created);
assertEquals(expectedSubs, subsInfo.getSubs());
checkEvent(created, expectedSubs, ComponentLifecycleEvent.CREATED);
assertNull(subsInfo.add(attrSubscription1));
TbAttributeSubscription attrSubscription2 = TbAttributeSubscription.builder()
.sessionId("session2")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key3", 3L, "key4", 4L))
.build();
expectedSubs.add(attrSubscription2);
TbEntitySubEvent updated = subsInfo.add(attrSubscription2);
assertNotNull(updated);
assertEquals(expectedSubs, subsInfo.getSubs());
checkEvent(updated, expectedSubs, ComponentLifecycleEvent.UPDATED);
}
@Test
public void removeTest() {
Set<TbAttributeSubscription> expectedSubs = new HashSet<>();
TbEntityLocalSubsInfo subsInfo = createSubsInfo();
TenantId tenantId = subsInfo.getTenantId();
EntityId entityId = subsInfo.getEntityId();
TbAttributeSubscription attrSubscription1 = TbAttributeSubscription.builder()
.sessionId("session1")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key1", 1L, "key2", 2L))
.build();
TbAttributeSubscription attrSubscription2 = TbAttributeSubscription.builder()
.sessionId("session2")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key3", 3L, "key4", 4L))
.build();
expectedSubs.add(attrSubscription1);
expectedSubs.add(attrSubscription2);
subsInfo.add(attrSubscription1);
subsInfo.add(attrSubscription2);
assertEquals(expectedSubs, subsInfo.getSubs());
TbEntitySubEvent updatedEvent = subsInfo.remove(attrSubscription1);
expectedSubs.remove(attrSubscription1);
assertNotNull(updatedEvent);
assertEquals(expectedSubs, subsInfo.getSubs());
checkEvent(updatedEvent, expectedSubs, ComponentLifecycleEvent.UPDATED);
TbEntitySubEvent deletedEvent = subsInfo.remove(attrSubscription2);
expectedSubs.remove(attrSubscription2);
assertNotNull(deletedEvent);
assertEquals(expectedSubs, subsInfo.getSubs());
checkEvent(deletedEvent, expectedSubs, ComponentLifecycleEvent.DELETED);
assertTrue(subsInfo.isEmpty());
}
@Test
public void removeAllTest() {
TbEntityLocalSubsInfo subsInfo = createSubsInfo();
TenantId tenantId = subsInfo.getTenantId();
EntityId entityId = subsInfo.getEntityId();
TbAttributeSubscription attrSubscription1 = TbAttributeSubscription.builder()
.sessionId("session1")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key1", 1L, "key2", 2L))
.build();
TbAttributeSubscription attrSubscription2 = TbAttributeSubscription.builder()
.sessionId("session2")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key3", 3L, "key4", 4L))
.build();
TbAttributeSubscription attrSubscription3 = TbAttributeSubscription.builder()
.sessionId("session3")
.tenantId(tenantId)
.entityId(entityId)
.keyStates(Map.of("key5", 5L, "key6", 6L))
.build();
subsInfo.add(attrSubscription1);
subsInfo.add(attrSubscription2);
subsInfo.add(attrSubscription3);
assertFalse(subsInfo.isEmpty());
TbEntitySubEvent updatedEvent = subsInfo.removeAll(List.of(attrSubscription1, attrSubscription2));
assertNotNull(updatedEvent);
checkEvent(updatedEvent, Set.of(attrSubscription3), ComponentLifecycleEvent.UPDATED);
assertFalse(subsInfo.isEmpty());
TbEntitySubEvent deletedEvent = subsInfo.removeAll(List.of(attrSubscription3));
assertNotNull(deletedEvent);
checkEvent(deletedEvent, null, ComponentLifecycleEvent.DELETED);
assertTrue(subsInfo.isEmpty());
}
private TbEntityLocalSubsInfo createSubsInfo() {
return new TbEntityLocalSubsInfo(new TenantId(UUID.randomUUID()), new DeviceId(UUID.randomUUID()));
}
private void checkEvent(TbEntitySubEvent event, Set<TbAttributeSubscription> expectedSubs, ComponentLifecycleEvent expectedType) {
assertEquals(expectedType, event.getType());
TbSubscriptionsInfo info = event.getInfo();
if (event.getType() == ComponentLifecycleEvent.DELETED) {
assertNull(info);
return;
}
assertNotNull(info);
assertFalse(info.notifications);
assertFalse(info.alarms);
assertFalse(info.attrAllKeys);
assertFalse(info.tsAllKeys);
assertNull(info.tsKeys);
assertEquals(getAttrKeys(expectedSubs), info.attrKeys);
}
private Set<String> getAttrKeys(Set<TbAttributeSubscription> attributeSubscriptions) {
return attributeSubscriptions.stream().map(s -> s.getKeyStates().keySet()).flatMap(Collection::stream).collect(Collectors.toSet());
}
}