Fix order of core notification processing
This commit is contained in:
parent
3025752d0c
commit
2e10b62535
@ -59,7 +59,6 @@ import java.util.concurrent.CountDownLatch;
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -129,12 +128,15 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
|||||||
if (msgs.isEmpty()) {
|
if (msgs.isEmpty()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = msgs.stream().collect(
|
List<IdMsgPair<N>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
|
||||||
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
|
ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = orderedMsgList.stream().collect(
|
||||||
|
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
|
||||||
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
|
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
|
||||||
TbPackProcessingContext<TbProtoQueueMsg<N>> ctx = new TbPackProcessingContext<>(
|
TbPackProcessingContext<TbProtoQueueMsg<N>> ctx = new TbPackProcessingContext<>(
|
||||||
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
|
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
|
||||||
pendingMap.forEach((id, msg) -> {
|
orderedMsgList.forEach(element -> {
|
||||||
|
UUID id = element.getUuid();
|
||||||
|
TbProtoQueueMsg<N> msg = element.getMsg();
|
||||||
log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue());
|
log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue());
|
||||||
TbCallback callback = new TbPackCallback<>(id, ctx);
|
TbCallback callback = new TbPackCallback<>(id, ctx);
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -173,6 +173,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
|
public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
|
||||||
|
log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo);
|
||||||
entityUpdates.put(entityId, entityUpdatesInfo);
|
entityUpdates.put(entityId, entityUpdatesInfo);
|
||||||
Set<TbSubscription<?>> pendingSubs = null;
|
Set<TbSubscription<?>> pendingSubs = null;
|
||||||
subsLock.lock();
|
subsLock.lock();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user