Refactor - rename variables. Remove redundant executor service

This commit is contained in:
Volodymyr Babak 2021-07-09 10:53:32 +03:00
parent 47311663cd
commit 00c3e3ebeb
2 changed files with 31 additions and 49 deletions

View File

@ -54,7 +54,6 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -103,11 +102,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private Server server; private Server server;
private ScheduledExecutorService scheduler; private ScheduledExecutorService edgeEventProcessingExecutorService;
private ExecutorService syncExecutorService; private ScheduledExecutorService sendDownlinkExecutorService;
private ScheduledExecutorService sendScheduler;
@PostConstruct @PostConstruct
public void init() { public void init() {
@ -134,10 +131,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.error("Failed to start Edge RPC server!", e); log.error("Failed to start Edge RPC server!", e);
throw new RuntimeException("Failed to start Edge RPC server!"); throw new RuntimeException("Failed to start Edge RPC server!");
} }
this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
this.sendScheduler = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler"));
this.syncExecutorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("edge-sync"));
log.info("Edge RPC service initialized!"); log.info("Edge RPC service initialized!");
} }
@ -154,20 +149,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
sessionEdgeEventChecks.remove(edgeId); sessionEdgeEventChecks.remove(edgeId);
} }
} }
if (scheduler != null) { if (edgeEventProcessingExecutorService != null) {
scheduler.shutdownNow(); edgeEventProcessingExecutorService.shutdownNow();
} }
if (sendScheduler != null) { if (sendDownlinkExecutorService != null) {
sendScheduler.shutdownNow(); sendDownlinkExecutorService.shutdownNow();
}
if (syncExecutorService != null) {
syncExecutorService.shutdownNow();
} }
} }
@Override @Override
public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) { public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, syncExecutorService, sendScheduler).getInputStream(); return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, sendDownlinkExecutorService).getInputStream();
} }
@Override @Override
@ -245,7 +237,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
EdgeId edgeId = session.getEdge().getId(); EdgeId edgeId = session.getEdge().getId();
UUID tenantId = session.getEdge().getTenantId().getId(); UUID tenantId = session.getEdge().getTenantId().getId();
if (sessions.containsKey(edgeId)) { if (sessions.containsKey(edgeId)) {
ScheduledFuture<?> schedule = scheduler.schedule(() -> { ScheduledFuture<?> edgeEventCheckTask = edgeEventProcessingExecutorService.schedule(() -> {
try { try {
final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
newEventLock.lock(); newEventLock.lock();
@ -275,7 +267,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), e); log.warn("[{}] Failed to process edge events for edge [{}]!", tenantId, session.getEdge().getId().getId(), e);
} }
}, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS); }, ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval(), TimeUnit.MILLISECONDS);
sessionEdgeEventChecks.put(edgeId, schedule); sessionEdgeEventChecks.put(edgeId, edgeEventCheckTask);
log.trace("[{}] Check edge event scheduled for edge [{}]", tenantId, edgeId.getId()); log.trace("[{}] Check edge event scheduled for edge [{}]", tenantId, edgeId.getId());
} else { } else {
log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]", log.debug("[{}] Session was removed and edge event check schedule must not be started [{}]",

View File

@ -79,7 +79,6 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -103,8 +102,9 @@ public final class EdgeGrpcSession implements Closeable {
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final Map<Integer, DownlinkMsg> pendingMsgsMap = new LinkedHashMap<>(); private final Map<Integer, DownlinkMsg> pendingMsgsMap = new LinkedHashMap<>();
private SettableFuture<Void> pendingFuture; // TODO: voba - global future - possible refactoring
private ScheduledFuture<?> sendSchedule; private SettableFuture<Void> sendDownlinkMsgsFuture;
private ScheduledFuture<?> scheduledSendDownlinkTask;
private EdgeContextComponent ctx; private EdgeContextComponent ctx;
private Edge edge; private Edge edge;
@ -113,20 +113,17 @@ public final class EdgeGrpcSession implements Closeable {
private boolean connected; private boolean connected;
private boolean syncCompleted; private boolean syncCompleted;
private ExecutorService syncExecutorService; private ScheduledExecutorService sendDownlinkExecutorService;
private ScheduledExecutorService sendScheduler;
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper, ExecutorService syncExecutorService, ScheduledExecutorService sendScheduler) { Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper, ScheduledExecutorService sendDownlinkExecutorService) {
this.sessionId = UUID.randomUUID(); this.sessionId = UUID.randomUUID();
this.ctx = ctx; this.ctx = ctx;
this.outputStream = outputStream; this.outputStream = outputStream;
this.sessionOpenListener = sessionOpenListener; this.sessionOpenListener = sessionOpenListener;
this.sessionCloseListener = sessionCloseListener; this.sessionCloseListener = sessionCloseListener;
this.mapper = mapper; this.mapper = mapper;
this.syncExecutorService = syncExecutorService; this.sendDownlinkExecutorService = sendDownlinkExecutorService;
this.sendScheduler = sendScheduler;
initInputStream(); initInputStream();
} }
@ -194,13 +191,7 @@ public final class EdgeGrpcSession implements Closeable {
public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { public void startSyncProcess(TenantId tenantId, EdgeId edgeId) {
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId); log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
syncCompleted = false; syncCompleted = false;
syncExecutorService.submit(() -> { doSync(new EdgeSyncCursor(ctx, edge));
try {
doSync(new EdgeSyncCursor(ctx, edge));
} catch (Exception e) {
log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e);
}
});
} }
private void doSync(EdgeSyncCursor cursor) { private void doSync(EdgeSyncCursor cursor) {
@ -273,10 +264,10 @@ public final class EdgeGrpcSession implements Closeable {
} }
if (pendingMsgsMap.size() == 0) { if (pendingMsgsMap.size() == 0) {
log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg); log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg);
if (sendSchedule != null) { if (scheduledSendDownlinkTask != null) {
sendSchedule.cancel(false); scheduledSendDownlinkTask.cancel(false);
} }
pendingFuture.set(null); sendDownlinkMsgsFuture.set(null);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e);
@ -399,21 +390,20 @@ public final class EdgeGrpcSession implements Closeable {
} }
private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) { private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
SettableFuture<Void> result = SettableFuture.create(); sendDownlinkMsgsFuture = SettableFuture.create();
downlinkMsgsPackLock.lock(); downlinkMsgsPackLock.lock();
try { try {
pendingMsgsMap.clear(); pendingMsgsMap.clear();
downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg));
scheduleDownlinkMsgsPackSend(true, result); scheduleDownlinkMsgsPackSend(true);
} finally { } finally {
downlinkMsgsPackLock.unlock(); downlinkMsgsPackLock.unlock();
} }
return result; return sendDownlinkMsgsFuture;
} }
private void scheduleDownlinkMsgsPackSend(boolean firstRun, SettableFuture<Void> futureResult) { private void scheduleDownlinkMsgsPackSend(boolean firstRun) {
pendingFuture = futureResult; Runnable sendDownlinkMsgsTask = () -> {
Runnable runnable = () -> {
try { try {
if (isConnected() && pendingMsgsMap.values().size() > 0) { if (isConnected() && pendingMsgsMap.values().size() > 0) {
if (!firstRun) { if (!firstRun) {
@ -426,19 +416,19 @@ public final class EdgeGrpcSession implements Closeable {
.setDownlinkMsg(downlinkMsg) .setDownlinkMsg(downlinkMsg)
.build()); .build());
} }
scheduleDownlinkMsgsPackSend(false, futureResult); scheduleDownlinkMsgsPackSend(false);
} else { } else {
futureResult.set(null); sendDownlinkMsgsFuture.set(null);
} }
} catch (Exception e) { } catch (Exception e) {
futureResult.setException(e); sendDownlinkMsgsFuture.setException(e);
} }
}; };
if (firstRun) { if (firstRun) {
syncExecutorService.submit(runnable); sendDownlinkExecutorService.submit(sendDownlinkMsgsTask);
} else { } else {
sendSchedule = sendScheduler.schedule(runnable, ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), TimeUnit.MILLISECONDS); scheduledSendDownlinkTask = sendDownlinkExecutorService.schedule(sendDownlinkMsgsTask, ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), TimeUnit.MILLISECONDS);
} }
} }