diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 37d3c42c8a..e118a0b0c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -54,7 +54,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -103,11 +102,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private Server server; - private ScheduledExecutorService scheduler; + private ScheduledExecutorService edgeEventProcessingExecutorService; - private ExecutorService syncExecutorService; - - private ScheduledExecutorService sendScheduler; + private ScheduledExecutorService sendDownlinkExecutorService; @PostConstruct public void init() { @@ -134,10 +131,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.error("Failed to start Edge RPC server!", e); throw new RuntimeException("Failed to start Edge RPC server!"); } - this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); - this.sendScheduler = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); - this.syncExecutorService = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("edge-sync")); + this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); + this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); log.info("Edge RPC service initialized!"); } @@ -154,20 +149,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionEdgeEventChecks.remove(edgeId); } } - if (scheduler != null) { - scheduler.shutdownNow(); + if (edgeEventProcessingExecutorService != null) { + edgeEventProcessingExecutorService.shutdownNow(); } - if (sendScheduler != null) { - sendScheduler.shutdownNow(); - } - if (syncExecutorService != null) { - syncExecutorService.shutdownNow(); + if (sendDownlinkExecutorService != null) { + sendDownlinkExecutorService.shutdownNow(); } } @Override public StreamObserver handleMsgs(StreamObserver outputStream) { - return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, syncExecutorService, sendScheduler).getInputStream(); + return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, sendDownlinkExecutorService).getInputStream(); } @Override @@ -245,7 +237,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i EdgeId edgeId = session.getEdge().getId(); UUID tenantId = session.getEdge().getTenantId().getId(); if (sessions.containsKey(edgeId)) { - ScheduledFuture schedule = scheduler.schedule(() -> { + ScheduledFuture edgeEventCheckTask = edgeEventProcessingExecutorService.schedule(() -> { try { final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); newEventLock.lock(); @@ -275,7 +267,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), e); } }, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS); - sessionEdgeEventChecks.put(edgeId, schedule); + sessionEdgeEventChecks.put(edgeId, edgeEventCheckTask); log.trace("[{}] Check edge event scheduled for edge [{}]", tenantId, edgeId.getId()); } else { log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]", diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f16df05af8..74edc66084 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -79,7 +79,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -103,8 +102,9 @@ public final class EdgeGrpcSession implements Closeable { private final ObjectMapper mapper; private final Map pendingMsgsMap = new LinkedHashMap<>(); - private SettableFuture pendingFuture; - private ScheduledFuture sendSchedule; + // TODO: voba - global future - possible refactoring + private SettableFuture sendDownlinkMsgsFuture; + private ScheduledFuture scheduledSendDownlinkTask; private EdgeContextComponent ctx; private Edge edge; @@ -113,20 +113,17 @@ public final class EdgeGrpcSession implements Closeable { private boolean connected; private boolean syncCompleted; - private ExecutorService syncExecutorService; - - private ScheduledExecutorService sendScheduler; + private ScheduledExecutorService sendDownlinkExecutorService; EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, - Consumer sessionCloseListener, ObjectMapper mapper, ExecutorService syncExecutorService, ScheduledExecutorService sendScheduler) { + Consumer sessionCloseListener, ObjectMapper mapper, ScheduledExecutorService sendDownlinkExecutorService) { this.sessionId = UUID.randomUUID(); this.ctx = ctx; this.outputStream = outputStream; this.sessionOpenListener = sessionOpenListener; this.sessionCloseListener = sessionCloseListener; this.mapper = mapper; - this.syncExecutorService = syncExecutorService; - this.sendScheduler = sendScheduler; + this.sendDownlinkExecutorService = sendDownlinkExecutorService; initInputStream(); } @@ -194,13 +191,7 @@ public final class EdgeGrpcSession implements Closeable { public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId); syncCompleted = false; - syncExecutorService.submit(() -> { - try { - doSync(new EdgeSyncCursor(ctx, edge)); - } catch (Exception e) { - log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e); - } - }); + doSync(new EdgeSyncCursor(ctx, edge)); } private void doSync(EdgeSyncCursor cursor) { @@ -273,10 +264,10 @@ public final class EdgeGrpcSession implements Closeable { } if (pendingMsgsMap.size() == 0) { log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg); - if (sendSchedule != null) { - sendSchedule.cancel(false); + if (scheduledSendDownlinkTask != null) { + scheduledSendDownlinkTask.cancel(false); } - pendingFuture.set(null); + sendDownlinkMsgsFuture.set(null); } } catch (Exception e) { log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); @@ -399,21 +390,20 @@ public final class EdgeGrpcSession implements Closeable { } private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { - SettableFuture result = SettableFuture.create(); + sendDownlinkMsgsFuture = SettableFuture.create(); downlinkMsgsPackLock.lock(); try { pendingMsgsMap.clear(); downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); - scheduleDownlinkMsgsPackSend(true, result); + scheduleDownlinkMsgsPackSend(true); } finally { downlinkMsgsPackLock.unlock(); } - return result; + return sendDownlinkMsgsFuture; } - private void scheduleDownlinkMsgsPackSend(boolean firstRun, SettableFuture futureResult) { - pendingFuture = futureResult; - Runnable runnable = () -> { + private void scheduleDownlinkMsgsPackSend(boolean firstRun) { + Runnable sendDownlinkMsgsTask = () -> { try { if (isConnected() && pendingMsgsMap.values().size() > 0) { if (!firstRun) { @@ -426,19 +416,19 @@ public final class EdgeGrpcSession implements Closeable { .setDownlinkMsg(downlinkMsg) .build()); } - scheduleDownlinkMsgsPackSend(false, futureResult); + scheduleDownlinkMsgsPackSend(false); } else { - futureResult.set(null); + sendDownlinkMsgsFuture.set(null); } } catch (Exception e) { - futureResult.setException(e); + sendDownlinkMsgsFuture.setException(e); } }; if (firstRun) { - syncExecutorService.submit(runnable); + sendDownlinkExecutorService.submit(sendDownlinkMsgsTask); } else { - sendSchedule = sendScheduler.schedule(runnable, ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), TimeUnit.MILLISECONDS); + scheduledSendDownlinkTask = sendDownlinkExecutorService.schedule(sendDownlinkMsgsTask, ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), TimeUnit.MILLISECONDS); } }