use threadsafe collection during subscription

This commit is contained in:
vparomskiy 2018-06-11 18:13:02 +03:00
parent 5ba6b67cd5
commit b13b810e4e

View File

@ -60,7 +60,6 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -68,6 +67,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -120,8 +120,8 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
} }
} }
private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>(); private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new ConcurrentHashMap<>();
private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>(); private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new ConcurrentHashMap<>();
@Override @Override
public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) { public void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub) {
@ -453,9 +453,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
} }
private void registerSubscription(String sessionId, EntityId entityId, Subscription subscription) { private void registerSubscription(String sessionId, EntityId entityId, Subscription subscription) {
Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.computeIfAbsent(entityId, k -> new HashSet<>()); Set<Subscription> deviceSubscriptions = subscriptionsByEntityId.computeIfAbsent(entityId, k -> ConcurrentHashMap.newKeySet());
deviceSubscriptions.add(subscription); deviceSubscriptions.add(subscription);
Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.computeIfAbsent(sessionId, k -> new HashMap<>()); Map<Integer, Subscription> sessionSubscriptions = subscriptionsByWsSessionId.computeIfAbsent(sessionId, k -> new ConcurrentHashMap<>());
sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);
} }