diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 71f47e7a69..5a3587fb1b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -242,7 +242,7 @@ public final class EdgeGrpcSession implements Closeable { EdgeEvent customerEdgeEvent = EdgeEventUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null); DownlinkMsg customerDownlinkMsg = convertToDownlinkMsg(customerEdgeEvent); - sendSingleDownlinkMsg(customerDownlinkMsg); + sendDownlinkMsgsPack(Collections.singletonList(customerDownlinkMsg)); startProcessingEdgeEvents(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); } @@ -256,7 +256,7 @@ public final class EdgeGrpcSession implements Closeable { DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder() .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) .build(); - sendSingleDownlinkMsg(syncCompleteDownlinkMsg); + sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)); } catch (Exception e) { log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e); } @@ -345,59 +345,37 @@ public final class EdgeGrpcSession implements Closeable { PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()); PageData pageData; UUID ifOffset = null; - boolean success = true; + boolean success; do { pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge.getId(), pageLink); if (isConnected() && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); - - success = processEdgeEventsPack(pageData.getData()); - + List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); + success = sendDownlinkMsgsPack(downlinkMsgsPack); ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); - } - if (isConnected() && (!success || pageData.hasNext())) { - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); - } catch (InterruptedException e) { - log.error("[{}] Error during sleep between batches", this.sessionId, e); - } if (success) { pageLink = pageLink.nextPageLink(); } + } else { + log.trace("[{}] no event(s) found. Stop processing edge events", this.sessionId); } - } while (isConnected() && (!success || pageData.hasNext())); + } while (isConnected() && pageData.hasNext()); return ifOffset; } - private boolean processEdgeEventsPack(List edgeEvents) throws InterruptedException { - List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); - - log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size()); - - latch = new CountDownLatch(downlinkMsgsPack.size()); - for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { - sendDownlinkMsg(ResponseMsg.newBuilder() - .setDownlinkMsg(downlinkMsg) - .build()); - } - - boolean success = latch.await(10, TimeUnit.SECONDS); - if (!success) { - log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack); - } - return success; - } - - private void sendSingleDownlinkMsg(DownlinkMsg downlinkMsg) throws InterruptedException { + private boolean sendDownlinkMsgsPack(List downlinkMsgsPack) throws InterruptedException { boolean success; do { - latch = new CountDownLatch(1); - sendDownlinkMsg(ResponseMsg.newBuilder() - .setDownlinkMsg(downlinkMsg) - .build()); + log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size()); + latch = new CountDownLatch(downlinkMsgsPack.size()); + for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { + sendDownlinkMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) + .build()); + } success = latch.await(10, TimeUnit.SECONDS); if (!success) { - log.warn("[{}] Failed to deliver single downlink msg!", this.sessionId); + log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack); } if (isConnected() && !success) { try { @@ -407,6 +385,7 @@ public final class EdgeGrpcSession implements Closeable { } } } while (isConnected() && !success); + return success; } private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {