diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 74edc66084..7fa5cc399e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -92,7 +92,6 @@ import java.util.stream.Collectors; public final class EdgeGrpcSession implements Closeable { private static final ReentrantLock downlinkMsgLock = new ReentrantLock(); - private static final ReentrantLock downlinkMsgsPackLock = new ReentrantLock(); private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; @@ -390,15 +389,15 @@ public final class EdgeGrpcSession implements Closeable { } private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { - sendDownlinkMsgsFuture = SettableFuture.create(); - downlinkMsgsPackLock.lock(); - try { - pendingMsgsMap.clear(); - downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); - scheduleDownlinkMsgsPackSend(true); - } finally { - downlinkMsgsPackLock.unlock(); + if (sendDownlinkMsgsFuture != null && !sendDownlinkMsgsFuture.isDone()) { + String erroMsg = "[" + this.sessionId + "] Previous send downdlink future was not properly completed, stopping it now"; + log.error(erroMsg); + sendDownlinkMsgsFuture.setException(new RuntimeException(erroMsg)); } + sendDownlinkMsgsFuture = SettableFuture.create(); + pendingMsgsMap.clear(); + downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); + scheduleDownlinkMsgsPackSend(true); return sendDownlinkMsgsFuture; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index 85ef95ee12..cb7916965f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -32,6 +32,7 @@ import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEve import java.util.LinkedList; import java.util.List; +import java.util.NoSuchElementException; public class EdgeSyncCursor { @@ -59,13 +60,12 @@ public class EdgeSyncCursor { } public EdgeEventFetcher getNext() { - if (hasNext()) { - EdgeEventFetcher edgeEventFetcher = fetchers.get(currentIdx); - currentIdx++; - return edgeEventFetcher; - } else { - throw new IndexOutOfBoundsException(); + if (!hasNext()) { + throw new NoSuchElementException(); } + EdgeEventFetcher edgeEventFetcher = fetchers.get(currentIdx); + currentIdx++; + return edgeEventFetcher; } public int getCurrentIdx() {