Merge pull request #7815 from volodymyr-babak/edge/rule-chain-fix
[3.4.4][Bug][Edge] Correctly set root rule chain on initial sync
This commit is contained in:
commit
93da604165
@ -15,8 +15,10 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc.fetch;
|
package org.thingsboard.server.service.edge.rpc.fetch;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.EdgeUtils;
|
import org.thingsboard.server.common.data.EdgeUtils;
|
||||||
import org.thingsboard.server.common.data.edge.Edge;
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||||
@ -28,6 +30,8 @@ import org.thingsboard.server.common.data.page.PageLink;
|
|||||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.service.edge.DefaultEdgeNotificationService.EDGE_IS_ROOT_BODY_KEY;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
public class RuleChainsEdgeEventFetcher extends BasePageableEdgeEventFetcher<RuleChain> {
|
public class RuleChainsEdgeEventFetcher extends BasePageableEdgeEventFetcher<RuleChain> {
|
||||||
@ -41,7 +45,13 @@ public class RuleChainsEdgeEventFetcher extends BasePageableEdgeEventFetcher<Rul
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, RuleChain ruleChain) {
|
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, RuleChain ruleChain) {
|
||||||
|
ObjectNode isRootBody = JacksonUtil.OBJECT_MAPPER.createObjectNode();
|
||||||
|
boolean isRoot = false;
|
||||||
|
try {
|
||||||
|
isRoot = ruleChain.getId().equals(edge.getRootRuleChainId());
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
isRootBody.put(EDGE_IS_ROOT_BODY_KEY, isRoot);
|
||||||
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN,
|
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN,
|
||||||
EdgeEventActionType.ADDED, ruleChain.getId(), null);
|
EdgeEventActionType.ADDED, ruleChain.getId(), isRootBody);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -48,6 +48,9 @@ public class DefaultTbEdgeService extends AbstractTbEntityService implements TbE
|
|||||||
ActionType actionType = edge.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
|
ActionType actionType = edge.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
|
||||||
TenantId tenantId = edge.getTenantId();
|
TenantId tenantId = edge.getTenantId();
|
||||||
try {
|
try {
|
||||||
|
if (actionType == ActionType.ADDED && edge.getRootRuleChainId() == null) {
|
||||||
|
edge.setRootRuleChainId(edgeTemplateRootRuleChain.getId());
|
||||||
|
}
|
||||||
Edge savedEdge = checkNotNull(edgeService.saveEdge(edge));
|
Edge savedEdge = checkNotNull(edgeService.saveEdge(edge));
|
||||||
EdgeId edgeId = savedEdge.getId();
|
EdgeId edgeId = savedEdge.getId();
|
||||||
|
|
||||||
|
|||||||
@ -825,26 +825,30 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
|||||||
assertThat(edgeImitator.waitForMessages()).as("await for messages on first connect").isTrue();
|
assertThat(edgeImitator.waitForMessages()).as("await for messages on first connect").isTrue();
|
||||||
|
|
||||||
assertThat(edgeImitator.findAllMessagesByType(QueueUpdateMsg.class)).as("one msg during sync process").hasSize(1);
|
assertThat(edgeImitator.findAllMessagesByType(QueueUpdateMsg.class)).as("one msg during sync process").hasSize(1);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class)).as("one msg during sync process, another from edge creation").hasSize(2);
|
List<RuleChainUpdateMsg> ruleChainUpdateMsgs = edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class);
|
||||||
|
assertThat(ruleChainUpdateMsgs).as("one msg during sync process, another from edge creation").hasSize(2);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class)).as("one msg during sync process for 'default' device profile").hasSize(3);
|
assertThat(edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class)).as("one msg during sync process for 'default' device profile").hasSize(3);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class)).as("one msg once device assigned to edge").hasSize(2);
|
assertThat(edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class)).as("one msg once device assigned to edge").hasSize(2);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class)).as("two msgs during sync process for 'default' and 'test' asset profiles").hasSize(4);
|
assertThat(edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class)).as("two msgs during sync process for 'default' and 'test' asset profiles").hasSize(4);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(AssetUpdateMsg.class)).as("two msgs - one during sync process, and one more once asset assigned to edge").hasSize(2);
|
assertThat(edgeImitator.findAllMessagesByType(AssetUpdateMsg.class)).as("two msgs - one during sync process, and one more once asset assigned to edge").hasSize(2);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(UserUpdateMsg.class)).as("one msg during sync process for tenant admin user").hasSize(1);
|
assertThat(edgeImitator.findAllMessagesByType(UserUpdateMsg.class)).as("one msg during sync process for tenant admin user").hasSize(1);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class)).as("admin setting update").hasSize(4);
|
assertThat(edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class)).as("admin setting update").hasSize(4);
|
||||||
|
verifyRuleChainMsgsAreRoot(ruleChainUpdateMsgs);
|
||||||
|
|
||||||
edgeImitator.expectMessageAmount(14);
|
edgeImitator.expectMessageAmount(14);
|
||||||
doPost("/api/edge/sync/" + edge.getId());
|
doPost("/api/edge/sync/" + edge.getId());
|
||||||
assertThat(edgeImitator.waitForMessages()).as("await for messages after edge sync rest api call").isTrue();
|
assertThat(edgeImitator.waitForMessages()).as("await for messages after edge sync rest api call").isTrue();
|
||||||
|
|
||||||
assertThat(edgeImitator.findAllMessagesByType(QueueUpdateMsg.class)).as("queue msg").hasSize(1);
|
assertThat(edgeImitator.findAllMessagesByType(QueueUpdateMsg.class)).as("queue msg").hasSize(1);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class)).as("rule chain msg").hasSize(1);
|
ruleChainUpdateMsgs = edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class);
|
||||||
|
assertThat(ruleChainUpdateMsgs).as("rule chain msg").hasSize(1);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class)).as("device profile msg").hasSize(2);
|
assertThat(edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class)).as("device profile msg").hasSize(2);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class)).as("asset profile msg").hasSize(3);
|
assertThat(edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class)).as("asset profile msg").hasSize(3);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(AssetUpdateMsg.class)).as("asset update msg").hasSize(1);
|
assertThat(edgeImitator.findAllMessagesByType(AssetUpdateMsg.class)).as("asset update msg").hasSize(1);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(UserUpdateMsg.class)).as("user update msg").hasSize(1);
|
assertThat(edgeImitator.findAllMessagesByType(UserUpdateMsg.class)).as("user update msg").hasSize(1);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class)).as("admin setting update msg").hasSize(4);
|
assertThat(edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class)).as("admin setting update msg").hasSize(4);
|
||||||
assertThat(edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class)).as("asset update msg").hasSize(1);
|
assertThat(edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class)).as("asset update msg").hasSize(1);
|
||||||
|
verifyRuleChainMsgsAreRoot(ruleChainUpdateMsgs);
|
||||||
|
|
||||||
edgeImitator.allowIgnoredTypes();
|
edgeImitator.allowIgnoredTypes();
|
||||||
try {
|
try {
|
||||||
@ -860,6 +864,12 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
|||||||
.andExpect(status().isOk());
|
.andExpect(status().isOk());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyRuleChainMsgsAreRoot(List<RuleChainUpdateMsg> ruleChainUpdateMsgs) {
|
||||||
|
for (RuleChainUpdateMsg ruleChainUpdateMsg : ruleChainUpdateMsgs) {
|
||||||
|
Assert.assertTrue(ruleChainUpdateMsg.getRoot());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteEdgeWithDeleteRelationsOk() throws Exception {
|
public void testDeleteEdgeWithDeleteRelationsOk() throws Exception {
|
||||||
EdgeId edgeId = savedEdge("Edge for Test WithRelationsOk").getId();
|
EdgeId edgeId = savedEdge("Edge for Test WithRelationsOk").getId();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user