Removed redundant lock

This commit is contained in:
Volodymyr Babak 2021-07-09 11:06:29 +03:00
parent 00c3e3ebeb
commit 9af40455bb
2 changed files with 14 additions and 15 deletions

View File

@ -92,7 +92,6 @@ import java.util.stream.Collectors;
public final class EdgeGrpcSession implements Closeable { public final class EdgeGrpcSession implements Closeable {
private static final ReentrantLock downlinkMsgLock = new ReentrantLock(); private static final ReentrantLock downlinkMsgLock = new ReentrantLock();
private static final ReentrantLock downlinkMsgsPackLock = new ReentrantLock();
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
@ -390,15 +389,15 @@ public final class EdgeGrpcSession implements Closeable {
} }
private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) { private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
sendDownlinkMsgsFuture = SettableFuture.create(); if (sendDownlinkMsgsFuture != null && !sendDownlinkMsgsFuture.isDone()) {
downlinkMsgsPackLock.lock(); String erroMsg = "[" + this.sessionId + "] Previous send downdlink future was not properly completed, stopping it now";
try { log.error(erroMsg);
pendingMsgsMap.clear(); sendDownlinkMsgsFuture.setException(new RuntimeException(erroMsg));
downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg));
scheduleDownlinkMsgsPackSend(true);
} finally {
downlinkMsgsPackLock.unlock();
} }
sendDownlinkMsgsFuture = SettableFuture.create();
pendingMsgsMap.clear();
downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg));
scheduleDownlinkMsgsPackSend(true);
return sendDownlinkMsgsFuture; return sendDownlinkMsgsFuture;
} }

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEve
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException;
public class EdgeSyncCursor { public class EdgeSyncCursor {
@ -59,13 +60,12 @@ public class EdgeSyncCursor {
} }
public EdgeEventFetcher getNext() { public EdgeEventFetcher getNext() {
if (hasNext()) { if (!hasNext()) {
EdgeEventFetcher edgeEventFetcher = fetchers.get(currentIdx); throw new NoSuchElementException();
currentIdx++;
return edgeEventFetcher;
} else {
throw new IndexOutOfBoundsException();
} }
EdgeEventFetcher edgeEventFetcher = fetchers.get(currentIdx);
currentIdx++;
return edgeEventFetcher;
} }
public int getCurrentIdx() { public int getCurrentIdx() {