From 4890976b9f9e7353454b4186e64e8ac9c1b6cf05 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 16 Aug 2022 18:52:31 +0300 Subject: [PATCH 1/2] Start regular edge event process after sync completed --- .../thingsboard/server/service/edge/EdgeContextComponent.java | 4 ++++ .../thingsboard/server/service/edge/rpc/EdgeGrpcSession.java | 1 + 2 files changed, 5 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index d7e861a9fb..29ff49d626 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -20,6 +20,7 @@ import lombok.Data; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.dashboard.DashboardService; @@ -61,6 +62,9 @@ import org.thingsboard.server.service.executors.GrpcCallbackExecutorService; @Lazy public class EdgeContextComponent { + @Autowired + private TbClusterService clusterService; + @Autowired private EdgeService edgeService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 2f9b4c0498..3f8950f281 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -211,6 +211,7 @@ public final class EdgeGrpcSession implements Closeable { @Override public void onSuccess(Void result) { syncCompleted = true; + ctx.getClusterService().onEdgeEventUpdate(edge.getTenantId(), edge.getId()); } @Override From e6ec672e509bc439286f8deb066ae5e54e7a658c Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 17 Aug 2022 13:22:17 +0300 Subject: [PATCH 2/2] Added test to verify that regular messages are processed by edge service after sync complete --- .../thingsboard/server/edge/BaseEdgeTest.java | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index d707bfb38b..6246492d25 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -201,12 +201,36 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { installation(); edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); - edgeImitator.expectMessageAmount(14); + edgeImitator.expectMessageAmount(15); edgeImitator.connect(); + requestEdgeRuleChainMetadata(); + verifyEdgeConnectionAndInitialData(); } + private void requestEdgeRuleChainMetadata() throws Exception { + RuleChainId rootRuleChainId = getEdgeRootRuleChainId(); + RuleChainMetadataRequestMsg.Builder builder = RuleChainMetadataRequestMsg.newBuilder() + .setRuleChainIdMSB(rootRuleChainId.getId().getMostSignificantBits()) + .setRuleChainIdLSB(rootRuleChainId.getId().getLeastSignificantBits()); + testAutoGeneratedCodeByProtobuf(builder); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder() + .addRuleChainMetadataRequestMsg(builder.build()); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + } + + private RuleChainId getEdgeRootRuleChainId() throws Exception { + List edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?", + new TypeReference>() {}, new PageLink(100)).getData(); + for (RuleChain edgeRuleChain : edgeRuleChains) { + if (edgeRuleChain.isRoot()) { + return edgeRuleChain.getId(); + } + } + throw new RuntimeException("Root rule chain not found"); + } + @After public void afterTest() throws Exception { try { @@ -281,7 +305,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { DeviceUpdateMsg deviceUpdateMsg = deviceUpdateMsgOpt.get(); Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceUpdateMsg.getMsgType()); UUID deviceUUID = new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()); - Device device = doGet("/api/device/" + deviceUUID.toString(), Device.class); + Device device = doGet("/api/device/" + deviceUUID, Device.class); Assert.assertNotNull(device); List edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/devices?", new TypeReference>() {}, new PageLink(100)).getData(); @@ -295,7 +319,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { DeviceProfileUpdateMsg deviceProfileUpdateMsg = deviceProfileUpdateMsgOpt.get(); Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType()); UUID deviceProfileUUID = new UUID(deviceProfileUpdateMsg.getIdMSB(), deviceProfileUpdateMsg.getIdLSB()); - DeviceProfile deviceProfile = doGet("/api/deviceProfile/" + deviceProfileUUID.toString(), DeviceProfile.class); + DeviceProfile deviceProfile = doGet("/api/deviceProfile/" + deviceProfileUUID, DeviceProfile.class); Assert.assertNotNull(deviceProfile); Assert.assertNotNull(deviceProfile.getProfileData()); Assert.assertNotNull(deviceProfile.getProfileData().getAlarms()); @@ -321,7 +345,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get(); Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainUpdateMsg.getMsgType()); UUID ruleChainUUID = new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB()); - RuleChain ruleChain = doGet("/api/ruleChain/" + ruleChainUUID.toString(), RuleChain.class); + RuleChain ruleChain = doGet("/api/ruleChain/" + ruleChainUUID, RuleChain.class); Assert.assertNotNull(ruleChain); List edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?", new TypeReference>() {}, new PageLink(100)).getData(); @@ -329,6 +353,13 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { testAutoGeneratedCodeByProtobuf(ruleChainUpdateMsg); + Optional ruleChainMetadataUpdateOpt = edgeImitator.findMessageByType(RuleChainMetadataUpdateMsg.class); + Assert.assertTrue(ruleChainMetadataUpdateOpt.isPresent()); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ruleChainMetadataUpdateOpt.get(); + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsg.getMsgType()); + Assert.assertEquals(ruleChainUpdateMsg.getIdMSB(), ruleChainMetadataUpdateMsg.getRuleChainIdMSB()); + Assert.assertEquals(ruleChainUpdateMsg.getIdLSB(), ruleChainMetadataUpdateMsg.getRuleChainIdLSB()); + validateAdminSettings(); }