Edge session send donwlink msgs refactoring

This commit is contained in:
Volodymyr Babak 2021-05-18 11:03:52 +03:00
parent 71791c6847
commit bb064cbbbd

View File

@ -242,7 +242,7 @@ public final class EdgeGrpcSession implements Closeable {
EdgeEvent customerEdgeEvent = EdgeEventUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(),
EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
DownlinkMsg customerDownlinkMsg = convertToDownlinkMsg(customerEdgeEvent);
sendSingleDownlinkMsg(customerDownlinkMsg);
sendDownlinkMsgsPack(Collections.singletonList(customerDownlinkMsg));
startProcessingEdgeEvents(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId()));
}
@ -256,7 +256,7 @@ public final class EdgeGrpcSession implements Closeable {
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build();
sendSingleDownlinkMsg(syncCompleteDownlinkMsg);
sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg));
} catch (Exception e) {
log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e);
}
@ -345,59 +345,37 @@ public final class EdgeGrpcSession implements Closeable {
PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount());
PageData<EdgeEvent> pageData;
UUID ifOffset = null;
boolean success = true;
boolean success;
do {
pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge.getId(), pageLink);
if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
success = processEdgeEventsPack(pageData.getData());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
success = sendDownlinkMsgsPack(downlinkMsgsPack);
ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
}
if (isConnected() && (!success || pageData.hasNext())) {
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
} catch (InterruptedException e) {
log.error("[{}] Error during sleep between batches", this.sessionId, e);
}
if (success) {
pageLink = pageLink.nextPageLink();
}
} else {
log.trace("[{}] no event(s) found. Stop processing edge events", this.sessionId);
}
} while (isConnected() && (!success || pageData.hasNext()));
} while (isConnected() && pageData.hasNext());
return ifOffset;
}
private boolean processEdgeEventsPack(List<EdgeEvent> edgeEvents) throws InterruptedException {
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
private boolean sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) throws InterruptedException {
boolean success;
do {
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size());
latch = new CountDownLatch(downlinkMsgsPack.size());
for (DownlinkMsg downlinkMsg : downlinkMsgsPack) {
sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg)
.build());
}
boolean success = latch.await(10, TimeUnit.SECONDS);
if (!success) {
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack);
}
return success;
}
private void sendSingleDownlinkMsg(DownlinkMsg downlinkMsg) throws InterruptedException {
boolean success;
do {
latch = new CountDownLatch(1);
sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg)
.build());
success = latch.await(10, TimeUnit.SECONDS);
if (!success) {
log.warn("[{}] Failed to deliver single downlink msg!", this.sessionId);
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack);
}
if (isConnected() && !success) {
try {
@ -407,6 +385,7 @@ public final class EdgeGrpcSession implements Closeable {
}
}
} while (isConnected() && !success);
return success;
}
private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {