diff --git a/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json index 740d64627e..a065279cce 100644 --- a/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json @@ -14,8 +14,8 @@ { "additionalInfo": { "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", - "layoutX": 203, - "layoutY": 259 + "layoutX": 187, + "layoutY": 468 }, "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "name": "Device Profile Node", @@ -99,14 +99,14 @@ }, { "additionalInfo": { - "layoutX": 1134, - "layoutY": 132 + "layoutX": 1129, + "layoutY": 52 }, "type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", "name": "Push to cloud", "debugMode": false, "configuration": { - "version": 0 + "scope": "SERVER_SCOPE" } } ], @@ -151,6 +151,11 @@ "toIndex": 4, "type": "RPC Request from Device" }, + { + "fromIndex": 3, + "toIndex": 7, + "type": "Attributes Updated" + }, { "fromIndex": 4, "toIndex": 7, diff --git a/application/src/main/data/json/tenant/edge_management/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/tenant/edge_management/rule_chains/edge_root_rule_chain.json index 740d64627e..a065279cce 100644 --- a/application/src/main/data/json/tenant/edge_management/rule_chains/edge_root_rule_chain.json +++ b/application/src/main/data/json/tenant/edge_management/rule_chains/edge_root_rule_chain.json @@ -14,8 +14,8 @@ { "additionalInfo": { "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", - "layoutX": 203, - "layoutY": 259 + "layoutX": 187, + "layoutY": 468 }, "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "name": "Device Profile Node", @@ -99,14 +99,14 @@ }, { "additionalInfo": { - "layoutX": 1134, - "layoutY": 132 + "layoutX": 1129, + "layoutY": 52 }, "type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", "name": "Push to cloud", "debugMode": false, "configuration": { - "version": 0 + "scope": "SERVER_SCOPE" } } ], @@ -151,6 +151,11 @@ "toIndex": 4, "type": "RPC Request from Device" }, + { + "fromIndex": 3, + "toIndex": 7, + "type": "Attributes Updated" + }, { "fromIndex": 4, "toIndex": 7, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f3626c0650..15f591694a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -83,8 +83,10 @@ import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEve import java.io.Closeable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -103,6 +105,7 @@ import java.util.stream.Collectors; public final class EdgeGrpcSession implements Closeable { private static final ReentrantLock downlinkMsgLock = new ReentrantLock(); + private static final ReentrantLock downlinkMsgsPackLock = new ReentrantLock(); private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; @@ -111,7 +114,7 @@ public final class EdgeGrpcSession implements Closeable { private final Consumer sessionCloseListener; private final ObjectMapper mapper; - private final Map pendingMsgsMap; + private final Map pendingMsgsMap = new HashMap<>(); private EdgeContextComponent ctx; private Edge edge; @@ -133,7 +136,6 @@ public final class EdgeGrpcSession implements Closeable { this.sessionCloseListener = sessionCloseListener; this.mapper = mapper; this.syncExecutorService = syncExecutorService; - this.pendingMsgsMap = new HashMap<>(); initInputStream(); } @@ -197,6 +199,7 @@ public final class EdgeGrpcSession implements Closeable { public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId); + syncCompleted = false; syncExecutorService.submit(() -> { try { startProcessingEdgeEvents(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); @@ -336,30 +339,36 @@ public final class EdgeGrpcSession implements Closeable { } private boolean sendDownlinkMsgsPack(List downlinkMsgsPack) throws InterruptedException { - boolean success; - pendingMsgsMap.clear(); - downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); - do { - log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size()); - latch = new CountDownLatch(pendingMsgsMap.values().size()); - for (DownlinkMsg downlinkMsg : pendingMsgsMap.values()) { - sendDownlinkMsg(ResponseMsg.newBuilder() - .setDownlinkMsg(downlinkMsg) - .build()); - } - success = latch.await(10, TimeUnit.SECONDS); - if (!success) { - log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values()); - } - if (isConnected() && !success) { - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); - } catch (InterruptedException e) { - log.error("[{}] Error during sleep between next send of single downlink msg", this.sessionId, e); + try { + downlinkMsgsPackLock.lock(); + boolean success; + pendingMsgsMap.clear(); + downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); + do { + log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size()); + latch = new CountDownLatch(pendingMsgsMap.values().size()); + Collection copy = new ArrayList<>(pendingMsgsMap.values()); + for (DownlinkMsg downlinkMsg : copy) { + sendDownlinkMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) + .build()); } - } - } while (isConnected() && !success); - return success; + success = latch.await(10, TimeUnit.SECONDS); + if (!success || pendingMsgsMap.values().size() > 0) { + log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values()); + } + if (isConnected() && (!success || pendingMsgsMap.values().size() > 0)) { + try { + Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); + } catch (InterruptedException e) { + log.error("[{}] Error during sleep between batches", this.sessionId, e); + } + } + } while (isConnected() && (!success || pendingMsgsMap.values().size() > 0)); + return success; + } finally { + downlinkMsgsPackLock.unlock(); + } } private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java index 909c2a3b03..bf71f94847 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java +++ b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java @@ -232,6 +232,7 @@ public class InstallScripts { try { createDefaultRuleChains(tenantId); createDefaultRuleChain(tenantId, "Thermostat"); + createDefaultEdgeRuleChains(tenantId); } catch (Exception e) { log.error("Unable to load dashboard from json", e); throw new RuntimeException("Unable to load dashboard from json", e);