Logic for handling rule chain updates from edge, including metadata

This commit is contained in:
Andrii Landiak 2023-09-08 14:54:02 +03:00
parent b1f70e162a
commit bb6d04be3b
13 changed files with 391 additions and 79 deletions

View File

@ -65,6 +65,8 @@ import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.RequestMsgType;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.SyncCompletedMsg;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;
@ -628,7 +630,7 @@ public final class EdgeGrpcSession implements Closeable {
case CUSTOMER:
return ctx.getCustomerProcessor().convertCustomerEventToDownlink(edgeEvent);
case RULE_CHAIN:
return ctx.getRuleChainProcessor().convertRuleChainEventToDownlink(edgeEvent);
return ctx.getRuleChainProcessor().convertRuleChainEventToDownlink(edgeEvent, this.edgeVersion);
case RULE_CHAIN_METADATA:
return ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, this.edgeVersion);
case ALARM:
@ -690,6 +692,16 @@ public final class EdgeGrpcSession implements Closeable {
result.add(ctx.getAssetProcessor().processAssetMsgFromEdge(edge.getTenantId(), edge, assetUpdateMsg));
}
}
if (uplinkMsg.getRuleChainUpdateMsgCount() > 0) {
for (RuleChainUpdateMsg ruleChainUpdateMsg : uplinkMsg.getRuleChainUpdateMsgList()) {
result.add(ctx.getRuleChainProcessor().processRuleChainMsgFromEdge(edge.getTenantId(), edge, ruleChainUpdateMsg));
}
}
if (uplinkMsg.getRuleChainMetadataUpdateMsgCount() > 0) {
for (RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg : uplinkMsg.getRuleChainMetadataUpdateMsgList()) {
result.add(ctx.getRuleChainProcessor().processRuleChainMetadataMsgFromEdge(edge.getTenantId(), edge, ruleChainMetadataUpdateMsg));
}
}
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg));

View File

@ -53,7 +53,6 @@ public class EdgeSyncCursor {
public EdgeSyncCursor(EdgeContextComponent ctx, Edge edge, boolean fullSync) {
if (fullSync) {
fetchers.add(new QueuesEdgeEventFetcher(ctx.getQueueService()));
fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService()));
fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService(), ctx.getFreemarkerConfig()));
fetchers.add(new TenantEdgeEventFetcher(ctx.getTenantService()));
fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService()));
@ -64,6 +63,7 @@ public class EdgeSyncCursor {
fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId()));
}
}
fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService()));
fetchers.add(new DashboardsEdgeEventFetcher(ctx.getDashboardService()));
fetchers.add(new DefaultProfilesEdgeEventFetcher(ctx.getDeviceProfileService(), ctx.getAssetProfileService()));
fetchers.add(new DeviceProfilesEdgeEventFetcher(ctx.getDeviceProfileService()));

View File

@ -233,6 +233,9 @@ public abstract class BaseEdgeProcessor {
@Autowired
protected DataValidator<EntityView> entityViewValidator;
@Autowired
protected DataValidator<RuleChain> ruleChainValidator;
@Autowired
protected EdgeMsgConstructor edgeMsgConstructor;

View File

@ -0,0 +1,127 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.rule;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleNodeProto;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
@Slf4j
public class BaseRuleChainProcessor extends BaseEdgeProcessor {
protected boolean saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg) {
boolean created = false;
RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
if (ruleChain == null) {
created = true;
ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);
ruleChain.setCreatedTime(Uuids.unixTimestamp(ruleChainId.getId()));
}
ruleChain.setName(ruleChainUpdateMsg.getName());
ruleChain.setType(RuleChainType.EDGE);
ruleChain.setDebugMode(ruleChainUpdateMsg.getDebugMode());
ruleChain.setConfiguration(JacksonUtil.toJsonNode(ruleChainUpdateMsg.getConfiguration()));
UUID firstRuleNodeUUID = safeGetUUID(ruleChainUpdateMsg.getFirstRuleNodeIdMSB(), ruleChainUpdateMsg.getFirstRuleNodeIdLSB());
ruleChain.setFirstRuleNodeId(firstRuleNodeUUID != null ? new RuleNodeId(firstRuleNodeUUID) : null);
ruleChainValidator.validate(ruleChain, RuleChain::getTenantId);
if (created) {
ruleChain.setId(ruleChainId);
}
ruleChainService.saveRuleChain(ruleChain, false);
return created;
}
protected boolean saveOrUpdateRuleChainMetadata(TenantId tenantId, RuleChainId ruleChainId, RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg) throws IOException {
RuleChainMetaData ruleChainMetadata = new RuleChainMetaData();
ruleChainMetadata.setRuleChainId(ruleChainId);
ruleChainMetadata.setNodes(parseNodeProtos(ruleChainId, ruleChainMetadataUpdateMsg.getNodesList()));
ruleChainMetadata.setConnections(parseConnectionProtos(ruleChainMetadataUpdateMsg.getConnectionsList()));
ruleChainMetadata.setRuleChainConnections(parseRuleChainConnectionProtos(ruleChainMetadataUpdateMsg.getRuleChainConnectionsList()));
if (ruleChainMetadataUpdateMsg.getFirstNodeIndex() != -1) {
ruleChainMetadata.setFirstNodeIndex(ruleChainMetadataUpdateMsg.getFirstNodeIndex());
}
if (ruleChainMetadata.getNodes().size() > 0) {
ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetadata, Function.identity());
return true;
}
return false;
}
private List<RuleNode> parseNodeProtos(RuleChainId ruleChainId, List<RuleNodeProto> nodesList) throws IOException {
List<RuleNode> result = new ArrayList<>();
for (RuleNodeProto proto : nodesList) {
RuleNode ruleNode = new RuleNode();
RuleNodeId ruleNodeId = new RuleNodeId(new UUID(proto.getIdMSB(), proto.getIdLSB()));
ruleNode.setId(ruleNodeId);
ruleNode.setCreatedTime(Uuids.unixTimestamp(ruleNodeId.getId()));
ruleNode.setRuleChainId(ruleChainId);
ruleNode.setType(proto.getType());
ruleNode.setName(proto.getName());
ruleNode.setDebugMode(proto.getDebugMode());
ruleNode.setConfiguration(JacksonUtil.OBJECT_MAPPER.readTree(proto.getConfiguration()));
ruleNode.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.readTree(proto.getAdditionalInfo()));
result.add(ruleNode);
}
return result;
}
private List<NodeConnectionInfo> parseConnectionProtos(List<org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto> connectionsList) {
List<NodeConnectionInfo> result = new ArrayList<>();
for (org.thingsboard.server.gen.edge.v1.NodeConnectionInfoProto proto : connectionsList) {
NodeConnectionInfo info = new NodeConnectionInfo();
info.setFromIndex(proto.getFromIndex());
info.setToIndex(proto.getToIndex());
info.setType(proto.getType());
result.add(info);
}
return result;
}
private List<RuleChainConnectionInfo> parseRuleChainConnectionProtos(List<org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto> ruleChainConnectionsList) throws IOException {
List<RuleChainConnectionInfo> result = new ArrayList<>();
for (org.thingsboard.server.gen.edge.v1.RuleChainConnectionInfoProto proto : ruleChainConnectionsList) {
RuleChainConnectionInfo info = new RuleChainConnectionInfo();
info.setFromIndex(proto.getFromIndex());
info.setTargetRuleChainId(new RuleChainId(new UUID(proto.getTargetRuleChainIdMSB(), proto.getTargetRuleChainIdLSB())));
info.setType(proto.getType());
info.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.readTree(proto.getAdditionalInfo()));
result.add(info);
}
return result;
}
}

View File

@ -15,29 +15,120 @@
*/
package org.thingsboard.server.service.edge.rpc.processor.rule;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
import java.util.UUID;
import static org.thingsboard.server.service.edge.DefaultEdgeNotificationService.EDGE_IS_ROOT_BODY_KEY;
@Component
@Slf4j
@TbCoreComponent
public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
public class RuleChainEdgeProcessor extends BaseRuleChainProcessor {
public DownlinkMsg convertRuleChainEventToDownlink(EdgeEvent edgeEvent) {
public ListenableFuture<Void> processRuleChainMsgFromEdge(TenantId tenantId, Edge edge, RuleChainUpdateMsg ruleChainUpdateMsg) {
log.trace("[{}] executing processRuleChainMsgFromEdge [{}] from edge [{}]", tenantId, ruleChainUpdateMsg, edge.getName());
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB()));
try {
edgeSynchronizationManager.getSync().set(true);
switch (ruleChainUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg, edge);
return Futures.immediateFuture(null);
case ENTITY_DELETED_RPC_MESSAGE:
RuleChain ruleChainToDelete = ruleChainService.findRuleChainById(tenantId, ruleChainId);
if (ruleChainToDelete != null) {
ruleChainService.unassignRuleChainFromEdge(tenantId, ruleChainId, edge.getId(), false);
}
return Futures.immediateFuture(null);
case UNRECOGNIZED:
default:
return handleUnsupportedMsgType(ruleChainUpdateMsg.getMsgType());
}
} catch (DataValidationException e) {
if (e.getMessage().contains("limit reached")) {
log.warn("[{}] Number of allowed rule chains violated {}", tenantId, ruleChainUpdateMsg, e);
return Futures.immediateFuture(null);
} else {
return Futures.immediateFailedFuture(e);
}
} finally {
edgeSynchronizationManager.getSync().remove();
}
}
public ListenableFuture<Void> processRuleChainMetadataMsgFromEdge(TenantId tenantId, Edge edge, RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg) {
log.trace("[{}] executing processRuleChainMetadataMsgFromEdge [{}] from edge [{}]", tenantId, ruleChainMetadataUpdateMsg, edge.getName());
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataUpdateMsg.getRuleChainIdMSB(), ruleChainMetadataUpdateMsg.getRuleChainIdLSB()));
try {
edgeSynchronizationManager.getSync().set(true);
switch (ruleChainMetadataUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
saveOrUpdateRuleChainMetadata(tenantId, ruleChainId, ruleChainMetadataUpdateMsg);
return Futures.immediateFuture(null);
case UNRECOGNIZED:
default:
return handleUnsupportedMsgType(ruleChainMetadataUpdateMsg.getMsgType());
}
} catch (Exception e) {
String errMsg = String.format("Can't process rule chain metadata update msg %s", ruleChainMetadataUpdateMsg);
log.error(errMsg, e);
return Futures.immediateFailedFuture(new RuntimeException(errMsg, e));
} finally {
edgeSynchronizationManager.getSync().remove();
}
}
private void saveOrUpdateRuleChain(TenantId tenantId, RuleChainId ruleChainId, RuleChainUpdateMsg ruleChainUpdateMsg, Edge edge) {
boolean created = super.saveOrUpdateRuleChain(tenantId, ruleChainId, ruleChainUpdateMsg);
if (created) {
createRelationFromEdge(tenantId, edge.getId(), ruleChainId);
pushRuleChainCreatedEventToRuleEngine(tenantId, edge, ruleChainId);
ruleChainService.assignRuleChainToEdge(tenantId, ruleChainId, edge.getId());
}
if (ruleChainUpdateMsg.getRoot()) {
edge.setRootRuleChainId(ruleChainId);
edgeService.saveEdge(edge);
}
}
private void pushRuleChainCreatedEventToRuleEngine(TenantId tenantId, Edge edge, RuleChainId ruleChainId) {
try {
RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
String ruleChainAsString = JacksonUtil.toString(ruleChain);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, null);
pushEntityEventToRuleEngine(tenantId, ruleChainId, null, TbMsgType.ENTITY_CREATED, ruleChainAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push rule chain action to rule engine: {}", tenantId, ruleChainId, TbMsgType.ENTITY_CREATED.name(), e);
}
}
public DownlinkMsg convertRuleChainEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) {
RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEvent.getAction()) {
@ -55,10 +146,13 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
RuleChainUpdateMsg ruleChainUpdateMsg =
ruleChainMsgConstructor.constructRuleChainUpdatedMsg(msgType, ruleChain, isRoot);
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(edgeEvent.getTenantId(), msgType, ruleChainMetaData, edgeVersion);
downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addRuleChainUpdateMsg(ruleChainUpdateMsg)
.build();
.addRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg).build();
}
break;
case DELETED:

View File

@ -81,14 +81,12 @@ import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.SyncCompletedMsg;
import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UserUpdateMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
@ -129,36 +127,12 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
installation();
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
edgeImitator.expectMessageAmount(26);
edgeImitator.expectMessageAmount(27);
edgeImitator.connect();
requestEdgeRuleChainMetadata();
verifyEdgeConnectionAndInitialData();
}
private void requestEdgeRuleChainMetadata() throws Exception {
RuleChainId rootRuleChainId = getEdgeRootRuleChainId();
RuleChainMetadataRequestMsg.Builder builder = RuleChainMetadataRequestMsg.newBuilder()
.setRuleChainIdMSB(rootRuleChainId.getId().getMostSignificantBits())
.setRuleChainIdLSB(rootRuleChainId.getId().getLeastSignificantBits());
testAutoGeneratedCodeByProtobuf(builder);
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder()
.addRuleChainMetadataRequestMsg(builder.build());
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
}
private RuleChainId getEdgeRootRuleChainId() throws Exception {
List<RuleChain> edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?",
new TypeReference<PageData<RuleChain>>() {}, new PageLink(100)).getData();
for (RuleChain edgeRuleChain : edgeRuleChains) {
if (edgeRuleChain.isRoot()) {
return edgeRuleChain.getId();
}
}
throw new RuntimeException("Root rule chain not found");
}
@After
public void teardownEdgeTest() {
try {
@ -230,7 +204,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
// 2 messages - 1 from rule chain fetcher and 1 from rule chain controller
UUID ruleChainUUID = validateRuleChains();
// 1 from request message
// 2 messages - 1 from rule chain fetcher and 1 from rule chain controller (it goes along with RuleChainUpdateMsg)
validateRuleChainMetadataUpdates(ruleChainUUID);
// 4 messages - 4 messages from fetcher - 2 from system level ('mail', 'mailTemplates') and 2 from admin level ('mail', 'mailTemplates')
@ -385,11 +359,17 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
private void validateRuleChainMetadataUpdates(UUID expectedRuleChainUUID) {
Optional<RuleChainMetadataUpdateMsg> ruleChainMetadataUpdateOpt = edgeImitator.findMessageByType(RuleChainMetadataUpdateMsg.class);
Assert.assertTrue(ruleChainMetadataUpdateOpt.isPresent());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ruleChainMetadataUpdateOpt.get();
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsg.getMsgType());
UUID ruleChainUUID = new UUID(ruleChainMetadataUpdateMsg.getRuleChainIdMSB(), ruleChainMetadataUpdateMsg.getRuleChainIdLSB());
List<RuleChainMetadataUpdateMsg> ruleChainMetadataUpdateMsgList = edgeImitator.findAllMessagesByType(RuleChainMetadataUpdateMsg.class);
Assert.assertEquals(2, ruleChainMetadataUpdateMsgList.size());
// metadata create msg
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsgCreated = ruleChainMetadataUpdateMsgList.get(0);
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, ruleChainMetadataUpdateMsgCreated.getMsgType());
UUID ruleChainUUID = new UUID(ruleChainMetadataUpdateMsgCreated.getRuleChainIdMSB(), ruleChainMetadataUpdateMsgCreated.getRuleChainIdLSB());
Assert.assertEquals(expectedRuleChainUUID, ruleChainUUID);
// metadata update msg
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsgUpdated = ruleChainMetadataUpdateMsgList.get(1);
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, ruleChainMetadataUpdateMsgUpdated.getMsgType());
ruleChainUUID = new UUID(ruleChainMetadataUpdateMsgUpdated.getRuleChainIdMSB(), ruleChainMetadataUpdateMsgUpdated.getRuleChainIdLSB());
Assert.assertEquals(expectedRuleChainUUID, ruleChainUUID);
}

View File

@ -168,6 +168,7 @@ public class AssetEdgeTest extends AbstractEdgeTest {
public void testSendAssetToCloud() throws Exception {
UUID uuid = Uuids.timeBased();
// created asset on edge
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
AssetUpdateMsg.Builder assetUpdateMsgBuilder = AssetUpdateMsg.newBuilder();
assetUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
@ -191,6 +192,31 @@ public class AssetEdgeTest extends AbstractEdgeTest {
Asset asset = doGet("/api/asset/" + uuid, Asset.class);
Assert.assertNotNull(asset);
Assert.assertEquals("Asset Edge 2", asset.getName());
// updated asset on edge
uplinkMsgBuilder = UplinkMsg.newBuilder();
assetUpdateMsgBuilder = AssetUpdateMsg.newBuilder();
assetUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
assetUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
assetUpdateMsgBuilder.setName("Asset Edge 2 Updated");
assetUpdateMsgBuilder.setType("test");
assetUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
testAutoGeneratedCodeByProtobuf(assetUpdateMsgBuilder);
uplinkMsgBuilder.addAssetUpdateMsg(assetUpdateMsgBuilder.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.expectResponsesAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
latestResponseMsg = edgeImitator.getLatestResponseMsg();
Assert.assertTrue(latestResponseMsg.getSuccess());
asset = doGet("/api/asset/" + uuid, Asset.class);
Assert.assertNotNull(asset);
Assert.assertEquals("Asset Edge 2 Updated", asset.getName());
}
@Test

View File

@ -171,6 +171,7 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
public void testSendDashboardToCloud() throws Exception {
UUID uuid = Uuids.timeBased();
// create dashboard on edge
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
DashboardUpdateMsg.Builder dashboardUpdateMsgBuilder = DashboardUpdateMsg.newBuilder();
dashboardUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
@ -191,6 +192,28 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
Dashboard dashboard = doGet("/api/dashboard/" + uuid, Dashboard.class);
Assert.assertNotNull(dashboard);
Assert.assertEquals("Edge Test Dashboard", dashboard.getName());
// update dashboard on edge
uplinkMsgBuilder = UplinkMsg.newBuilder();
dashboardUpdateMsgBuilder = DashboardUpdateMsg.newBuilder();
dashboardUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
dashboardUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
dashboardUpdateMsgBuilder.setTitle("Edge Test Dashboard Updated");
dashboardUpdateMsgBuilder.setConfiguration("");
dashboardUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
testAutoGeneratedCodeByProtobuf(dashboardUpdateMsgBuilder);
uplinkMsgBuilder.addDashboardUpdateMsg(dashboardUpdateMsgBuilder.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.expectResponsesAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
dashboard = doGet("/api/dashboard/" + uuid, Dashboard.class);
Assert.assertNotNull(dashboard);
Assert.assertEquals("Edge Test Dashboard Updated", dashboard.getName());
}
@Test

View File

@ -538,6 +538,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
public void testSendDeviceToCloud() throws Exception {
UUID uuid = Uuids.timeBased();
// create device on edge
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
DeviceUpdateMsg.Builder deviceUpdateMsgBuilder = DeviceUpdateMsg.newBuilder();
deviceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
@ -566,6 +567,36 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
Device device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device);
Assert.assertEquals("Edge Device 2", device.getName());
// update device on edge
uplinkMsgBuilder = UplinkMsg.newBuilder();
deviceUpdateMsgBuilder = DeviceUpdateMsg.newBuilder();
deviceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
deviceUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
deviceUpdateMsgBuilder.setName("Edge Device 2 Updated");
deviceUpdateMsgBuilder.setType("test");
deviceUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages());
latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg);
latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage;
Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device);
Assert.assertEquals("Edge Device 2 Updated", device.getName());
}
@Test

View File

@ -15,7 +15,7 @@
*/
package org.thingsboard.server.edge;
import com.google.protobuf.AbstractMessage;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
@ -29,11 +29,10 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;
import java.util.ArrayList;
import java.util.Collections;
@ -49,7 +48,7 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
@Test
public void testRuleChains() throws Exception {
// create rule chain
edgeImitator.expectMessageAmount(2);
edgeImitator.expectMessageAmount(4);
RuleChain ruleChain = new RuleChain();
ruleChain.setName("Edge Test Rule Chain");
ruleChain.setType(RuleChainType.EDGE);
@ -67,8 +66,6 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(ruleChainUpdateMsg.getIdLSB(), savedRuleChain.getUuidId().getLeastSignificantBits());
Assert.assertEquals(ruleChainUpdateMsg.getName(), savedRuleChain.getName());
testRuleChainMetadataRequestMsg(savedRuleChain.getId());
// unassign rule chain from edge
edgeImitator.expectMessageAmount(1);
doDelete("/api/edge/" + edge.getUuidId()
@ -89,55 +86,56 @@ public class RuleChainEdgeTest extends AbstractEdgeTest {
}
@Test
public void testSendRuleChainMetadataRequestToCloud() throws Exception {
RuleChainId edgeRootRuleChainId = edge.getRootRuleChainId();
public void testRuleChainToCloud() throws Exception {
UUID uuid = Uuids.timeBased();
// create rule chain on edge
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
RuleChainMetadataRequestMsg.Builder ruleChainMetadataRequestMsgBuilder = RuleChainMetadataRequestMsg.newBuilder();
ruleChainMetadataRequestMsgBuilder.setRuleChainIdMSB(edgeRootRuleChainId.getId().getMostSignificantBits());
ruleChainMetadataRequestMsgBuilder.setRuleChainIdLSB(edgeRootRuleChainId.getId().getLeastSignificantBits());
testAutoGeneratedCodeByProtobuf(ruleChainMetadataRequestMsgBuilder);
uplinkMsgBuilder.addRuleChainMetadataRequestMsg(ruleChainMetadataRequestMsgBuilder.build());
RuleChainUpdateMsg.Builder ruleChainUpdateMsgBuilder = RuleChainUpdateMsg.newBuilder();
ruleChainUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
ruleChainUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
ruleChainUpdateMsgBuilder.setName("Rule Chain Edge");
ruleChainUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
testAutoGeneratedCodeByProtobuf(ruleChainUpdateMsgBuilder);
uplinkMsgBuilder.addRuleChainUpdateMsg(ruleChainUpdateMsgBuilder.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof RuleChainMetadataUpdateMsg);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = (RuleChainMetadataUpdateMsg) latestMessage;
Assert.assertEquals(ruleChainMetadataUpdateMsg.getRuleChainIdMSB(), edgeRootRuleChainId.getId().getMostSignificantBits());
Assert.assertEquals(ruleChainMetadataUpdateMsg.getRuleChainIdLSB(), edgeRootRuleChainId.getId().getLeastSignificantBits());
UplinkResponseMsg latestResponseMsg = edgeImitator.getLatestResponseMsg();
Assert.assertTrue(latestResponseMsg.getSuccess());
testAutoGeneratedCodeByProtobuf(ruleChainMetadataUpdateMsg);
}
RuleChain ruleChain = doGet("/api/ruleChain/" + uuid, RuleChain.class);
Assert.assertNotNull(ruleChain);
Assert.assertEquals("Rule Chain Edge", ruleChain.getName());
private void testRuleChainMetadataRequestMsg(RuleChainId ruleChainId) throws Exception {
RuleChainMetadataRequestMsg.Builder ruleChainMetadataRequestMsgBuilder = RuleChainMetadataRequestMsg.newBuilder()
.setRuleChainIdMSB(ruleChainId.getId().getMostSignificantBits())
.setRuleChainIdLSB(ruleChainId.getId().getLeastSignificantBits());
testAutoGeneratedCodeByProtobuf(ruleChainMetadataRequestMsgBuilder);
// update rule chain on edge
uplinkMsgBuilder = UplinkMsg.newBuilder();
ruleChainUpdateMsgBuilder = RuleChainUpdateMsg.newBuilder();
ruleChainUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
ruleChainUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
ruleChainUpdateMsgBuilder.setName("Rule Chain Edge Updated");
ruleChainUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
testAutoGeneratedCodeByProtobuf(ruleChainUpdateMsgBuilder);
uplinkMsgBuilder.addRuleChainUpdateMsg(ruleChainUpdateMsgBuilder.build());
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder()
.addRuleChainMetadataRequestMsg(ruleChainMetadataRequestMsgBuilder.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof RuleChainMetadataUpdateMsg);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = (RuleChainMetadataUpdateMsg) latestMessage;
RuleChainId receivedRuleChainId =
new RuleChainId(new UUID(ruleChainMetadataUpdateMsg.getRuleChainIdMSB(), ruleChainMetadataUpdateMsg.getRuleChainIdLSB()));
Assert.assertEquals(ruleChainId, receivedRuleChainId);
Assert.assertTrue(edgeImitator.waitForResponses());
latestResponseMsg = edgeImitator.getLatestResponseMsg();
Assert.assertTrue(latestResponseMsg.getSuccess());
ruleChain = doGet("/api/ruleChain/" + uuid, RuleChain.class);
Assert.assertNotNull(ruleChain);
Assert.assertEquals("Rule Chain Edge Updated", ruleChain.getName());
}
private void createRuleChainMetadata(RuleChain ruleChain) {

View File

@ -44,6 +44,8 @@ public interface RuleChainService extends EntityDaoService {
RuleChain saveRuleChain(RuleChain ruleChain);
RuleChain saveRuleChain(RuleChain ruleChain, boolean doValidate);
boolean setRootRuleChain(TenantId tenantId, RuleChainId ruleChainId);
RuleChainUpdateResult saveRuleChainMetaData(TenantId tenantId, RuleChainMetaData ruleChainMetaData, Function<RuleNode, RuleNode> ruleNodeUpdater);

View File

@ -420,6 +420,7 @@ message TenantProfileUpdateMsg {
bytes profileDataBytes = 8;
}
// deprecated
message RuleChainMetadataRequestMsg {
int64 ruleChainIdMSB = 1;
int64 ruleChainIdLSB = 2;
@ -557,7 +558,7 @@ message UplinkMsg {
repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 4;
repeated AlarmUpdateMsg alarmUpdateMsg = 5;
repeated RelationUpdateMsg relationUpdateMsg = 6;
repeated RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg = 7;
repeated RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg = 7; // deprecated
repeated AttributesRequestMsg attributesRequestMsg = 8;
repeated RelationRequestMsg relationRequestMsg = 9;
repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10;
@ -571,6 +572,8 @@ message UplinkMsg {
repeated EntityViewUpdateMsg entityViewUpdateMsg = 18;
repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19;
repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20;
repeated RuleChainUpdateMsg ruleChainUpdateMsg = 21;
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 22;
}
message UplinkResponseMsg {

View File

@ -108,7 +108,20 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
@Override
@Transactional
public RuleChain saveRuleChain(RuleChain ruleChain) {
return doSaveRuleChain(ruleChain, true);
}
@Override
@Transactional
public RuleChain saveRuleChain(RuleChain ruleChain, boolean doValidate) {
return doSaveRuleChain(ruleChain, doValidate);
}
private RuleChain doSaveRuleChain(RuleChain ruleChain, boolean doValidate) {
log.trace("Executing doSaveRuleChain [{}]", ruleChain);
if (doValidate) {
ruleChainValidator.validate(ruleChain, RuleChain::getTenantId);
}
try {
RuleChain savedRuleChain = ruleChainDao.save(ruleChain.getTenantId(), ruleChain);
if (ruleChain.getId() == null) {