diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 7fa5cc399e..32e23fddad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -100,10 +100,7 @@ public final class EdgeGrpcSession implements Closeable { private final Consumer sessionCloseListener; private final ObjectMapper mapper; - private final Map pendingMsgsMap = new LinkedHashMap<>(); - // TODO: voba - global future - possible refactoring - private SettableFuture sendDownlinkMsgsFuture; - private ScheduledFuture scheduledSendDownlinkTask; + private final EdgeSessionState sessionState = new EdgeSessionState(); private EdgeContextComponent ctx; private Edge edge; @@ -256,17 +253,17 @@ public final class EdgeGrpcSession implements Closeable { private void onDownlinkResponse(DownlinkResponseMsg msg) { try { if (msg.getSuccess()) { - pendingMsgsMap.remove(msg.getDownlinkMsgId()); + sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg); } else { log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg()); } - if (pendingMsgsMap.size() == 0) { + if (sessionState.getPendingMsgsMap().isEmpty()) { log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg); - if (scheduledSendDownlinkTask != null) { - scheduledSendDownlinkTask.cancel(false); + if (sessionState.getScheduledSendDownlinkTask() != null) { + sessionState.getScheduledSendDownlinkTask().cancel(false); } - sendDownlinkMsgsFuture.set(null); + sessionState.getSendDownlinkMsgsFuture().set(null); } } catch (Exception e) { log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); @@ -389,27 +386,27 @@ public final class EdgeGrpcSession implements Closeable { } private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { - if (sendDownlinkMsgsFuture != null && !sendDownlinkMsgsFuture.isDone()) { + if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) { String erroMsg = "[" + this.sessionId + "] Previous send downdlink future was not properly completed, stopping it now"; log.error(erroMsg); - sendDownlinkMsgsFuture.setException(new RuntimeException(erroMsg)); + sessionState.getSendDownlinkMsgsFuture().setException(new RuntimeException(erroMsg)); } - sendDownlinkMsgsFuture = SettableFuture.create(); - pendingMsgsMap.clear(); - downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); + sessionState.setSendDownlinkMsgsFuture(SettableFuture.create()); + sessionState.getPendingMsgsMap().clear(); + downlinkMsgsPack.forEach(msg -> sessionState.getPendingMsgsMap().put(msg.getDownlinkMsgId(), msg)); scheduleDownlinkMsgsPackSend(true); - return sendDownlinkMsgsFuture; + return sessionState.getSendDownlinkMsgsFuture(); } private void scheduleDownlinkMsgsPackSend(boolean firstRun) { Runnable sendDownlinkMsgsTask = () -> { try { - if (isConnected() && pendingMsgsMap.values().size() > 0) { + if (isConnected() && sessionState.getPendingMsgsMap().values().size() > 0) { if (!firstRun) { - log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values()); + log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, sessionState.getPendingMsgsMap().values()); } - log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size()); - List copy = new ArrayList<>(pendingMsgsMap.values()); + log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, sessionState.getPendingMsgsMap().values().size()); + List copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); for (DownlinkMsg downlinkMsg : copy) { sendDownlinkMsg(ResponseMsg.newBuilder() .setDownlinkMsg(downlinkMsg) @@ -417,17 +414,22 @@ public final class EdgeGrpcSession implements Closeable { } scheduleDownlinkMsgsPackSend(false); } else { - sendDownlinkMsgsFuture.set(null); + sessionState.getSendDownlinkMsgsFuture().set(null); } } catch (Exception e) { - sendDownlinkMsgsFuture.setException(e); + sessionState.getSendDownlinkMsgsFuture().setException(e); } }; if (firstRun) { sendDownlinkExecutorService.submit(sendDownlinkMsgsTask); } else { - scheduledSendDownlinkTask = sendDownlinkExecutorService.schedule(sendDownlinkMsgsTask, ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), TimeUnit.MILLISECONDS); + sessionState.setScheduledSendDownlinkTask( + sendDownlinkExecutorService.schedule( + sendDownlinkMsgsTask, + ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), + TimeUnit.MILLISECONDS) + ); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java new file mode 100644 index 0000000000..986b271b41 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc; + +import com.google.common.util.concurrent.SettableFuture; +import lombok.Data; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; + +@Data +public class EdgeSessionState { + + private final Map pendingMsgsMap = new LinkedHashMap<>(); + private SettableFuture sendDownlinkMsgsFuture; + private ScheduledFuture scheduledSendDownlinkTask; +}