diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 7cd4ec52eb..091a5a4988 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -59,7 +59,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -129,12 +128,15 @@ public abstract class AbstractConsumerService> pendingMap = msgs.stream().collect( - Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); + List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList()); + ConcurrentMap> pendingMap = orderedMsgList.stream().collect( + Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg)); CountDownLatch processingTimeoutLatch = new CountDownLatch(1); TbPackProcessingContext> ctx = new TbPackProcessingContext<>( processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); - pendingMap.forEach((id, msg) -> { + orderedMsgList.forEach(element -> { + UUID id = element.getUuid(); + TbProtoQueueMsg msg = element.getMsg(); log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue()); TbCallback callback = new TbPackCallback<>(id, ctx); try { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 1bb452f4a9..3cf4c1c0e0 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -173,6 +173,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) { + log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo); entityUpdates.put(entityId, entityUpdatesInfo); Set> pendingSubs = null; subsLock.lock();