diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 5c683c3be9..70faba02fe 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -41,7 +41,11 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.page.TimePageData; import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.device.DeviceService; @@ -55,16 +59,20 @@ import org.thingsboard.server.gen.edge.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; +import org.thingsboard.server.gen.edge.NodeConnectionInfoProto; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.gen.edge.RuleChainConnectionInfoProto; +import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; +import org.thingsboard.server.gen.edge.RuleNodeProto; import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; import org.thingsboard.server.service.edge.EdgeContextComponent; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -146,7 +154,7 @@ public final class EdgeGrpcSession implements Cloneable { void processHandleMessages() throws ExecutionException, InterruptedException { Long queueStartTs = getQueueStartTs().get(); - // TODO: this 100 value must be chagned properly + // TODO: this 100 value must be changed properly TimePageLink pageLink = new TimePageLink(30, queueStartTs + 1000); TimePageData pageData; UUID ifOffset = null; @@ -179,6 +187,10 @@ public final class EdgeGrpcSession implements Cloneable { RuleChain ruleChain = objectMapper.readValue(entry.getData(), RuleChain.class); onRuleChainUpdated(msgType, ruleChain); break; + case RULE_CHAIN_METADATA: + RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class); + onRuleChainMetadataUpdated(msgType, ruleChainMetaData); + break; } } catch (Exception e) { log.error("Exception during processing records from queue", e); @@ -244,6 +256,15 @@ public final class EdgeGrpcSession implements Cloneable { .build()); } + private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) { + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); + if (ruleChainMetadataUpdateMsg != null) { + outputStream.onNext(ResponseMsg.newBuilder() + .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg) + .build()); + } + } + private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) { outputStream.onNext(ResponseMsg.newBuilder() .setDashboardUpdateMsg(constructDashboardUpdatedMsg(msgType, dashboard)) @@ -281,6 +302,83 @@ public final class EdgeGrpcSession implements Cloneable { return builder.build(); } + private RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) { + try { + RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder() + .setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits()) + .setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits()) + .addAllNodes(constructNodes(ruleChainMetaData.getNodes())) + .addAllConnections(constructConnections(ruleChainMetaData.getConnections())) + .addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections())); + if (ruleChainMetaData.getFirstNodeIndex() != null) { + builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex()); + } + builder.setMsgType(msgType); + return builder.build(); + } catch (JsonProcessingException ex) { + log.error("Can't construct RuleChainMetadataUpdateMsg", ex); + } + return null; + } + + private List constructRuleChainConnections(List ruleChainConnections) throws JsonProcessingException { + List result = new ArrayList<>(); + if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) { + for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) { + result.add(constructRuleChainConnection(ruleChainConnectionInfo)); + } + } + return result; + } + + private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException { + return RuleChainConnectionInfoProto.newBuilder() + .setFromIndex(ruleChainConnectionInfo.getFromIndex()) + .setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits()) + .setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits()) + .setType(ruleChainConnectionInfo.getType()) + .setAdditionalInfo(objectMapper.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo())) + .build(); + } + + private List constructConnections(List connections) { + List result = new ArrayList<>(); + if (connections != null && !connections.isEmpty()) { + for (NodeConnectionInfo connection : connections) { + result.add(constructConnection(connection)); + } + } + return result; + } + + private NodeConnectionInfoProto constructConnection(NodeConnectionInfo connection) { + return NodeConnectionInfoProto.newBuilder() + .setFromIndex(connection.getFromIndex()) + .setToIndex(connection.getToIndex()) + .setType(connection.getType()) + .build(); + } + + private List constructNodes(List nodes) throws JsonProcessingException { + List result = new ArrayList<>(); + if (nodes != null && !nodes.isEmpty()) { + for (RuleNode node : nodes) { + result.add(constructNode(node)); + } + } + return result; + } + + private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException { + return RuleNodeProto.newBuilder() + .setType(node.getType()) + .setName(node.getName()) + .setDebugMode(node.isDebugMode()) + .setConfiguration(objectMapper.writeValueAsString(node.getConfiguration())) + .setAdditionalInfo(objectMapper.writeValueAsString(node.getAdditionalInfo())) + .build(); + } + private DashboardUpdateMsg constructDashboardUpdatedMsg(UpdateMsgType msgType, Dashboard dashboard) { DashboardUpdateMsg.Builder builder = DashboardUpdateMsg.newBuilder() .setMsgType(msgType) diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index b9cafde6b6..46c9f0ddd6 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -15,16 +15,13 @@ */ package org.thingsboard.server.dao.edge; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.Event; -import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java new file mode 100644 index 0000000000..1be0d7c01d --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java @@ -0,0 +1,20 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.edge; + +public enum EdgeQueueEntityType { + DASHBOARD, ASSET, DEVICE, ENTITY_VIEW, ALARM, RULE_CHAIN, RULE_CHAIN_METADATA +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntry.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntry.java index 6dad707c90..c29722d82a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntry.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntry.java @@ -16,11 +16,10 @@ package org.thingsboard.server.common.data.edge; import lombok.Data; -import org.thingsboard.server.common.data.EntityType; @Data public class EdgeQueueEntry { private String type; - private EntityType entityType; + private EdgeQueueEntityType entityType; private String data; } diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index 53c443ef1f..75b3756ce6 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -37,6 +37,7 @@ import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; @@ -75,6 +76,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { Consumer onAssetUpdate, Consumer onEntityViewUpdate, Consumer onRuleChainUpdate, + Consumer onRuleChainMetadataUpdate, Consumer onDashboardUpdate, Consumer onDownlink, Consumer onError) { @@ -90,7 +92,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { channel = builder.build(); EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel); log.info("[{}] Sending a connect request to the TB!", edgeKey); - this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onDashboardUpdate, onDownlink, onError)); + this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onRuleChainMetadataUpdate, onDashboardUpdate, onDownlink, onError)); this.inputStream.onNext(RequestMsg.newBuilder() .setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE) .setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build()) @@ -120,6 +122,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { Consumer onAssetUpdate, Consumer onEntityViewUpdate, Consumer onRuleChainUpdate, + Consumer onRuleChainMetadataUpdate, Consumer onDashboardUpdate, Consumer onDownlink, Consumer onError) { @@ -150,6 +153,9 @@ public class EdgeGrpcClient implements EdgeRpcClient { } else if (responseMsg.hasRuleChainUpdateMsg()) { log.debug("[{}] Rule Chain udpate message received {}", edgeKey, responseMsg.getRuleChainUpdateMsg()); onRuleChainUpdate.accept(responseMsg.getRuleChainUpdateMsg()); + } else if (responseMsg.hasRuleChainMetadataUpdateMsg()) { + log.debug("[{}] Rule Chain Metadata udpate message received {}", edgeKey, responseMsg.getRuleChainMetadataUpdateMsg()); + onRuleChainMetadataUpdate.accept(responseMsg.getRuleChainMetadataUpdateMsg()); } else if (responseMsg.hasDashboardUpdateMsg()) { log.debug("[{}] Dashboard message received {}", edgeKey, responseMsg.getDashboardUpdateMsg()); onDashboardUpdate.accept(responseMsg.getDashboardUpdateMsg()); diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java index aa390a0d94..99ff2e4ddd 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java @@ -21,6 +21,7 @@ import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; +import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; @@ -37,6 +38,7 @@ public interface EdgeRpcClient { Consumer onAssetUpdate, Consumer onEntityViewUpdate, Consumer onRuleChainUpdate, + Consumer onRuleChainMetadataUpdate, Consumer onDashboardUpdate, Consumer onDownlink, Consumer onError); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 73fd216c70..a842c1952e 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -43,10 +43,11 @@ message ResponseMsg { UplinkResponseMsg uplinkResponseMsg = 2; DeviceUpdateMsg deviceUpdateMsg = 3; RuleChainUpdateMsg ruleChainUpdateMsg = 4; - DashboardUpdateMsg dashboardUpdateMsg = 5; - AssetUpdateMsg assetUpdateMsg = 6; - EntityViewUpdateMsg entityViewUpdateMsg = 7; - DownlinkMsg downlinkMsg = 8; + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 5; + DashboardUpdateMsg dashboardUpdateMsg = 6; + AssetUpdateMsg assetUpdateMsg = 7; + EntityViewUpdateMsg entityViewUpdateMsg = 8; + DownlinkMsg downlinkMsg = 9; } enum RequestMsgType { @@ -127,6 +128,38 @@ message RuleChainUpdateMsg { string configuration = 9; } +message RuleChainMetadataUpdateMsg { + UpdateMsgType msgType = 1; + int64 ruleChainIdMSB = 2; + int64 ruleChainIdLSB = 3; + int32 firstNodeIndex = 4; + repeated RuleNodeProto nodes = 5; + repeated NodeConnectionInfoProto connections = 6; + repeated RuleChainConnectionInfoProto ruleChainConnections = 7; +} + +message RuleNodeProto { + string type = 1; + string name = 2; + bool debugMode = 3; + string configuration = 4; + string additionalInfo = 5; +} + +message NodeConnectionInfoProto { + int32 fromIndex = 1; + int32 toIndex = 2; + string type = 3; +} + +message RuleChainConnectionInfoProto { + int32 fromIndex = 1; + int64 targetRuleChainIdMSB = 2; + int64 targetRuleChainIdLSB = 3; + string type = 4; + string additionalInfo = 5; +} + message DashboardUpdateMsg { UpdateMsgType msgType = 1; int64 idMSB = 2; diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java index 748d494020..c9de9fab67 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java @@ -15,12 +15,10 @@ */ package org.thingsboard.server.dao.edge; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; @@ -31,12 +29,16 @@ import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.ShortEdgeInfo; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeQueueEntityType; import org.thingsboard.server.common.data.edge.EdgeQueueEntry; import org.thingsboard.server.common.data.edge.EdgeSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; @@ -50,6 +52,7 @@ import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.dao.customer.CustomerDao; import org.thingsboard.server.dao.dashboard.DashboardService; @@ -328,62 +331,118 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } - private void processDevice(TenantId tenantId, TbMsg tbMsg) { - // TODO - } - - private void processDashboard(TenantId tenantId, TbMsg tbMsg) { - processAssignedEntity(tenantId, tbMsg, EntityType.DASHBOARD); - } - - private void processEntityView(TenantId tenantId, TbMsg tbMsg) { - // TODO - } - - private void processAsset(TenantId tenantId, TbMsg tbMsg) { - // TODO - } - - private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EntityType entityType) { - EdgeId edgeId; + private void processDevice(TenantId tenantId, TbMsg tbMsg) throws IOException { switch (tbMsg.getType()) { case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId"))); - pushEventToEdge(tenantId, edgeId, tbMsg.getType(), entityType, tbMsg.getData()); - break; case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId"))); - pushEventToEdge(tenantId, edgeId, tbMsg.getType(), entityType, tbMsg.getData()); + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE); break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + Device device = mapper.readValue(tbMsg.getData(), Device.class); + if (device.getEdgeId() != null) { + pushEventsToEdge(tenantId, device.getEdgeId(), EdgeQueueEntityType.DEVICE, tbMsg); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); } } + private void processAsset(TenantId tenantId, TbMsg tbMsg) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + Asset asset = mapper.readValue(tbMsg.getData(), Asset.class); + if (asset.getEdgeId() != null) { + pushEventsToEdge(tenantId, asset.getEdgeId(), EdgeQueueEntityType.ASSET, tbMsg); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processEntityView(TenantId tenantId, TbMsg tbMsg) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class); + if (entityView.getEdgeId() != null) { + pushEventsToEdge(tenantId, entityView.getEdgeId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processDashboard(TenantId tenantId, TbMsg tbMsg) throws IOException { + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD); + } + private void processRuleChain(TenantId tenantId, TbMsg tbMsg) throws IOException { switch (tbMsg.getType()) { case DataConstants.ENTITY_ASSIGNED_TO_EDGE: case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - processAssignedEntity(tenantId, tbMsg, EntityType.RULE_CHAIN); + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN); break; case DataConstants.ENTITY_DELETED: case DataConstants.ENTITY_CREATED: case DataConstants.ENTITY_UPDATED: RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); - for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) { - pushEventToEdge(tenantId, assignedEdge.getEdgeId(), tbMsg.getType(), EntityType.RULE_CHAIN, tbMsg.getData()); + if (ruleChain.getAssignedEdges() != null && !ruleChain.getAssignedEdges().isEmpty()) { + for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) { + pushEventsToEdge(tenantId, assignedEdge.getEdgeId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg); + } } break; default: - log.warn("Unsupported message type " + tbMsg.getType()); + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); } - } - private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, String type, EntityType entityType, String data) { - log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type [{}], data [{}]", tenantId, edgeId, type, data); + private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType) throws IOException { + EdgeId edgeId; + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId"))); + pushEventsToEdge(tenantId, edgeId, entityType, tbMsg); + break; + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId"))); + pushEventsToEdge(tenantId, edgeId, entityType, tbMsg); + break; + } + } + + private void pushEventsToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg) throws IOException { + log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg); + + pushEventsToEdge(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData()); + + if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) { + pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg); + } + } + + private void pushEventsToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data) throws IOException { + log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data); EdgeQueueEntry queueEntry = new EdgeQueueEntry(); - queueEntry.setType(type); queueEntry.setEntityType(entityType); + queueEntry.setType(type); queueEntry.setData(data); Event event = new Event(); @@ -394,6 +453,20 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic eventService.saveAsync(event); } + private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg) throws IOException { + RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + case DataConstants.ENTITY_UPDATED: + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId()); + pushEventsToEdge(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData)); + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + @Override public TimePageData findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink); diff --git a/ui/src/app/api/rule-chain.service.js b/ui/src/app/api/rule-chain.service.js index 51b2ff9e1b..c511f4ac05 100644 --- a/ui/src/app/api/rule-chain.service.js +++ b/ui/src/app/api/rule-chain.service.js @@ -388,17 +388,24 @@ function RuleChainService($http, $q, $filter, $ocLazyLoad, $translate, types, co } function prepareRuleChain(ruleChain) { + ruleChain.assignedEdgesText = ""; ruleChain.assignedEdgesIds = []; + if (ruleChain.assignedEdges && ruleChain.assignedEdges.length) { + var assignedEdgesTitles = []; for (var j = 0; j < ruleChain.assignedEdges.length; j++) { var assignedEdge = ruleChain.assignedEdges[j]; ruleChain.assignedEdgesIds.push(assignedEdge.edgeId.id); + assignedEdgesTitles.push(assignedEdge.title); } + ruleChain.assignedEdgesText = assignedEdgesTitles.join(', '); } + return ruleChain; } function cleanRuleChain(ruleChain) { + delete ruleChain.assignedEdgesText; delete ruleChain.assignedEdgesIds; return ruleChain; } diff --git a/ui/src/app/import-export/import-export.service.js b/ui/src/app/import-export/import-export.service.js index 1a130a71ef..1bef77b6b2 100644 --- a/ui/src/app/import-export/import-export.service.js +++ b/ui/src/app/import-export/import-export.service.js @@ -251,6 +251,7 @@ export default function ImportExport($log, $translate, $q, $mdDialog, $document, ruleChain.firstRuleNodeId = null; } ruleChain.root = false; + delete ruleChain.assignedEdgesText; return ruleChain; } diff --git a/ui/src/app/locale/locale.constant-en_US.json b/ui/src/app/locale/locale.constant-en_US.json index 0c4f13f239..0c99306d45 100644 --- a/ui/src/app/locale/locale.constant-en_US.json +++ b/ui/src/app/locale/locale.constant-en_US.json @@ -1436,7 +1436,8 @@ "assign-to-edges": "Assign Rule Chain(s) To Edges", "assign-to-edges-text": "Please select the edges to assign the rulechain(s)", "unassign-from-edges": "Unassign Rule Chain(s) From Edges", - "unassign-from-edges-text": "Please select the edges to unassign from the rulechain(s)" + "unassign-from-edges-text": "Please select the edges to unassign from the rulechain(s)", + "assigned-to-edges": "Assigned to edges" }, "rulenode": { "details": "Details", diff --git a/ui/src/app/rulechain/rulechain-card.scss b/ui/src/app/rulechain/rulechain-card.scss new file mode 100644 index 0000000000..c043fcf6b3 --- /dev/null +++ b/ui/src/app/rulechain/rulechain-card.scss @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +.tb-rule-chain-assigned-edges { + display: block; + display: -webkit-box; /* stylelint-disable-line value-no-vendor-prefix */ + height: 34px; + margin-bottom: 4px; + overflow: hidden; + text-overflow: ellipsis; + -webkit-line-clamp: 2; + -webkit-box-orient: vertical; /* stylelint-disable-line property-no-vendor-prefix */ +} diff --git a/ui/src/app/rulechain/rulechain-card.tpl.html b/ui/src/app/rulechain/rulechain-card.tpl.html index d628c5a0c3..8233f21912 100644 --- a/ui/src/app/rulechain/rulechain-card.tpl.html +++ b/ui/src/app/rulechain/rulechain-card.tpl.html @@ -15,4 +15,5 @@ limitations under the License. --> +
{{'rulechain.assigned-to-edges' | translate}}: '{{vm.item.assignedEdgesText}}'
rulechain.root
diff --git a/ui/src/app/rulechain/rulechain-fieldset.tpl.html b/ui/src/app/rulechain/rulechain-fieldset.tpl.html index 9fed2f4712..e0718273eb 100644 --- a/ui/src/app/rulechain/rulechain-fieldset.tpl.html +++ b/ui/src/app/rulechain/rulechain-fieldset.tpl.html @@ -36,6 +36,11 @@ + + + +
diff --git a/ui/src/app/rulechain/rulechain.directive.js b/ui/src/app/rulechain/rulechain.directive.js index 71f893a561..ae26d78ec8 100644 --- a/ui/src/app/rulechain/rulechain.directive.js +++ b/ui/src/app/rulechain/rulechain.directive.js @@ -37,11 +37,12 @@ export default function RuleChainDirective($compile, $templateCache, $mdDialog, scope: { ruleChain: '=', isEdit: '=', + ruleChainScope: '=', isReadOnly: '=', theForm: '=', onSetRootRuleChain: '&', onExportRuleChain: '&', - onDeleteRuleChain: '&' + onDeleteRuleChain: '&', } }; } diff --git a/ui/src/app/rulechain/rulechains.controller.js b/ui/src/app/rulechain/rulechains.controller.js index 3ccee70f1b..c19f62f6c0 100644 --- a/ui/src/app/rulechain/rulechains.controller.js +++ b/ui/src/app/rulechain/rulechains.controller.js @@ -22,6 +22,8 @@ import addRuleChainsToEdgeTemplate from "./add-rulechains-to-edge.tpl.html"; /* eslint-enable import/no-unresolved, import/default */ +import './rulechain-card.scss'; + /*@ngInject*/ export default function RuleChainsController(ruleChainService, userService, importExport, $state, $stateParams, $filter, $translate, $mdDialog, $document, $q, types) { diff --git a/ui/src/app/rulechain/rulechains.tpl.html b/ui/src/app/rulechain/rulechains.tpl.html index e68db2d77f..b2d9086c24 100644 --- a/ui/src/app/rulechain/rulechains.tpl.html +++ b/ui/src/app/rulechain/rulechains.tpl.html @@ -24,6 +24,7 @@