Added Rule Chain Metadata Update Msg. Added assigned edges text to rule chain
This commit is contained in:
parent
5e46f3e7af
commit
e0881cfc44
@ -41,7 +41,11 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.page.TimePageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
@ -55,16 +59,20 @@ import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.EdgeConfiguration;
|
||||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.NodeConnectionInfoProto;
|
||||
import org.thingsboard.server.gen.edge.RequestMsg;
|
||||
import org.thingsboard.server.gen.edge.RequestMsgType;
|
||||
import org.thingsboard.server.gen.edge.ResponseMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainConnectionInfoProto;
|
||||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleNodeProto;
|
||||
import org.thingsboard.server.gen.edge.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.edge.UplinkMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
|
||||
import org.thingsboard.server.service.edge.EdgeContextComponent;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -146,7 +154,7 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
|
||||
void processHandleMessages() throws ExecutionException, InterruptedException {
|
||||
Long queueStartTs = getQueueStartTs().get();
|
||||
// TODO: this 100 value must be chagned properly
|
||||
// TODO: this 100 value must be changed properly
|
||||
TimePageLink pageLink = new TimePageLink(30, queueStartTs + 1000);
|
||||
TimePageData<Event> pageData;
|
||||
UUID ifOffset = null;
|
||||
@ -179,6 +187,10 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
RuleChain ruleChain = objectMapper.readValue(entry.getData(), RuleChain.class);
|
||||
onRuleChainUpdated(msgType, ruleChain);
|
||||
break;
|
||||
case RULE_CHAIN_METADATA:
|
||||
RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class);
|
||||
onRuleChainMetadataUpdated(msgType, ruleChainMetaData);
|
||||
break;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Exception during processing records from queue", e);
|
||||
@ -244,6 +256,15 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
.build());
|
||||
}
|
||||
|
||||
private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
|
||||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
|
||||
if (ruleChainMetadataUpdateMsg != null) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setDashboardUpdateMsg(constructDashboardUpdatedMsg(msgType, dashboard))
|
||||
@ -281,6 +302,83 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
|
||||
try {
|
||||
RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder()
|
||||
.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits())
|
||||
.setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits())
|
||||
.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
|
||||
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
|
||||
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections()));
|
||||
if (ruleChainMetaData.getFirstNodeIndex() != null) {
|
||||
builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex());
|
||||
}
|
||||
builder.setMsgType(msgType);
|
||||
return builder.build();
|
||||
} catch (JsonProcessingException ex) {
|
||||
log.error("Can't construct RuleChainMetadataUpdateMsg", ex);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections) throws JsonProcessingException {
|
||||
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
|
||||
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
|
||||
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
|
||||
result.add(constructRuleChainConnection(ruleChainConnectionInfo));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException {
|
||||
return RuleChainConnectionInfoProto.newBuilder()
|
||||
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
|
||||
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
|
||||
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
|
||||
.setType(ruleChainConnectionInfo.getType())
|
||||
.setAdditionalInfo(objectMapper.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
|
||||
List<NodeConnectionInfoProto> result = new ArrayList<>();
|
||||
if (connections != null && !connections.isEmpty()) {
|
||||
for (NodeConnectionInfo connection : connections) {
|
||||
result.add(constructConnection(connection));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private NodeConnectionInfoProto constructConnection(NodeConnectionInfo connection) {
|
||||
return NodeConnectionInfoProto.newBuilder()
|
||||
.setFromIndex(connection.getFromIndex())
|
||||
.setToIndex(connection.getToIndex())
|
||||
.setType(connection.getType())
|
||||
.build();
|
||||
}
|
||||
|
||||
private List<RuleNodeProto> constructNodes(List<RuleNode> nodes) throws JsonProcessingException {
|
||||
List<RuleNodeProto> result = new ArrayList<>();
|
||||
if (nodes != null && !nodes.isEmpty()) {
|
||||
for (RuleNode node : nodes) {
|
||||
result.add(constructNode(node));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
|
||||
return RuleNodeProto.newBuilder()
|
||||
.setType(node.getType())
|
||||
.setName(node.getName())
|
||||
.setDebugMode(node.isDebugMode())
|
||||
.setConfiguration(objectMapper.writeValueAsString(node.getConfiguration()))
|
||||
.setAdditionalInfo(objectMapper.writeValueAsString(node.getAdditionalInfo()))
|
||||
.build();
|
||||
}
|
||||
|
||||
private DashboardUpdateMsg constructDashboardUpdatedMsg(UpdateMsgType msgType, Dashboard dashboard) {
|
||||
DashboardUpdateMsg.Builder builder = DashboardUpdateMsg.newBuilder()
|
||||
.setMsgType(msgType)
|
||||
|
||||
@ -15,16 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.edge;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.TextPageData;
|
||||
import org.thingsboard.server.common.data.page.TextPageLink;
|
||||
|
||||
@ -0,0 +1,20 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.edge;
|
||||
|
||||
public enum EdgeQueueEntityType {
|
||||
DASHBOARD, ASSET, DEVICE, ENTITY_VIEW, ALARM, RULE_CHAIN, RULE_CHAIN_METADATA
|
||||
}
|
||||
@ -16,11 +16,10 @@
|
||||
package org.thingsboard.server.common.data.edge;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
|
||||
@Data
|
||||
public class EdgeQueueEntry {
|
||||
private String type;
|
||||
private EntityType entityType;
|
||||
private EdgeQueueEntityType entityType;
|
||||
private String data;
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@ import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RequestMsg;
|
||||
import org.thingsboard.server.gen.edge.RequestMsgType;
|
||||
import org.thingsboard.server.gen.edge.ResponseMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
|
||||
@ -75,6 +76,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
Consumer<AssetUpdateMsg> onAssetUpdate,
|
||||
Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
|
||||
Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
|
||||
Consumer<RuleChainMetadataUpdateMsg> onRuleChainMetadataUpdate,
|
||||
Consumer<DashboardUpdateMsg> onDashboardUpdate,
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError) {
|
||||
@ -90,7 +92,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
channel = builder.build();
|
||||
EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel);
|
||||
log.info("[{}] Sending a connect request to the TB!", edgeKey);
|
||||
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onDashboardUpdate, onDownlink, onError));
|
||||
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onRuleChainMetadataUpdate, onDashboardUpdate, onDownlink, onError));
|
||||
this.inputStream.onNext(RequestMsg.newBuilder()
|
||||
.setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE)
|
||||
.setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build())
|
||||
@ -120,6 +122,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
Consumer<AssetUpdateMsg> onAssetUpdate,
|
||||
Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
|
||||
Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
|
||||
Consumer<RuleChainMetadataUpdateMsg> onRuleChainMetadataUpdate,
|
||||
Consumer<DashboardUpdateMsg> onDashboardUpdate,
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError) {
|
||||
@ -150,6 +153,9 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
} else if (responseMsg.hasRuleChainUpdateMsg()) {
|
||||
log.debug("[{}] Rule Chain udpate message received {}", edgeKey, responseMsg.getRuleChainUpdateMsg());
|
||||
onRuleChainUpdate.accept(responseMsg.getRuleChainUpdateMsg());
|
||||
} else if (responseMsg.hasRuleChainMetadataUpdateMsg()) {
|
||||
log.debug("[{}] Rule Chain Metadata udpate message received {}", edgeKey, responseMsg.getRuleChainMetadataUpdateMsg());
|
||||
onRuleChainMetadataUpdate.accept(responseMsg.getRuleChainMetadataUpdateMsg());
|
||||
} else if (responseMsg.hasDashboardUpdateMsg()) {
|
||||
log.debug("[{}] Dashboard message received {}", edgeKey, responseMsg.getDashboardUpdateMsg());
|
||||
onDashboardUpdate.accept(responseMsg.getDashboardUpdateMsg());
|
||||
|
||||
@ -21,6 +21,7 @@ import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.EdgeConfiguration;
|
||||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
|
||||
@ -37,6 +38,7 @@ public interface EdgeRpcClient {
|
||||
Consumer<AssetUpdateMsg> onAssetUpdate,
|
||||
Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
|
||||
Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
|
||||
Consumer<RuleChainMetadataUpdateMsg> onRuleChainMetadataUpdate,
|
||||
Consumer<DashboardUpdateMsg> onDashboardUpdate,
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError);
|
||||
|
||||
@ -43,10 +43,11 @@ message ResponseMsg {
|
||||
UplinkResponseMsg uplinkResponseMsg = 2;
|
||||
DeviceUpdateMsg deviceUpdateMsg = 3;
|
||||
RuleChainUpdateMsg ruleChainUpdateMsg = 4;
|
||||
DashboardUpdateMsg dashboardUpdateMsg = 5;
|
||||
AssetUpdateMsg assetUpdateMsg = 6;
|
||||
EntityViewUpdateMsg entityViewUpdateMsg = 7;
|
||||
DownlinkMsg downlinkMsg = 8;
|
||||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 5;
|
||||
DashboardUpdateMsg dashboardUpdateMsg = 6;
|
||||
AssetUpdateMsg assetUpdateMsg = 7;
|
||||
EntityViewUpdateMsg entityViewUpdateMsg = 8;
|
||||
DownlinkMsg downlinkMsg = 9;
|
||||
}
|
||||
|
||||
enum RequestMsgType {
|
||||
@ -127,6 +128,38 @@ message RuleChainUpdateMsg {
|
||||
string configuration = 9;
|
||||
}
|
||||
|
||||
message RuleChainMetadataUpdateMsg {
|
||||
UpdateMsgType msgType = 1;
|
||||
int64 ruleChainIdMSB = 2;
|
||||
int64 ruleChainIdLSB = 3;
|
||||
int32 firstNodeIndex = 4;
|
||||
repeated RuleNodeProto nodes = 5;
|
||||
repeated NodeConnectionInfoProto connections = 6;
|
||||
repeated RuleChainConnectionInfoProto ruleChainConnections = 7;
|
||||
}
|
||||
|
||||
message RuleNodeProto {
|
||||
string type = 1;
|
||||
string name = 2;
|
||||
bool debugMode = 3;
|
||||
string configuration = 4;
|
||||
string additionalInfo = 5;
|
||||
}
|
||||
|
||||
message NodeConnectionInfoProto {
|
||||
int32 fromIndex = 1;
|
||||
int32 toIndex = 2;
|
||||
string type = 3;
|
||||
}
|
||||
|
||||
message RuleChainConnectionInfoProto {
|
||||
int32 fromIndex = 1;
|
||||
int64 targetRuleChainIdMSB = 2;
|
||||
int64 targetRuleChainIdLSB = 3;
|
||||
string type = 4;
|
||||
string additionalInfo = 5;
|
||||
}
|
||||
|
||||
message DashboardUpdateMsg {
|
||||
UpdateMsgType msgType = 1;
|
||||
int64 idMSB = 2;
|
||||
|
||||
@ -15,12 +15,10 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.edge;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cache.Cache;
|
||||
@ -31,12 +29,16 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.server.common.data.Customer;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.ShortEdgeInfo;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeQueueEntityType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
|
||||
import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
@ -50,6 +52,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.dao.customer.CustomerDao;
|
||||
import org.thingsboard.server.dao.dashboard.DashboardService;
|
||||
@ -328,62 +331,118 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
|
||||
}
|
||||
|
||||
private void processDevice(TenantId tenantId, TbMsg tbMsg) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
private void processDashboard(TenantId tenantId, TbMsg tbMsg) {
|
||||
processAssignedEntity(tenantId, tbMsg, EntityType.DASHBOARD);
|
||||
}
|
||||
|
||||
private void processEntityView(TenantId tenantId, TbMsg tbMsg) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
private void processAsset(TenantId tenantId, TbMsg tbMsg) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EntityType entityType) {
|
||||
EdgeId edgeId;
|
||||
private void processDevice(TenantId tenantId, TbMsg tbMsg) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
|
||||
pushEventToEdge(tenantId, edgeId, tbMsg.getType(), entityType, tbMsg.getData());
|
||||
break;
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
|
||||
pushEventToEdge(tenantId, edgeId, tbMsg.getType(), entityType, tbMsg.getData());
|
||||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE);
|
||||
break;
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_CREATED:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
Device device = mapper.readValue(tbMsg.getData(), Device.class);
|
||||
if (device.getEdgeId() != null) {
|
||||
pushEventsToEdge(tenantId, device.getEdgeId(), EdgeQueueEntityType.DEVICE, tbMsg);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processAsset(TenantId tenantId, TbMsg tbMsg) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET);
|
||||
break;
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_CREATED:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
Asset asset = mapper.readValue(tbMsg.getData(), Asset.class);
|
||||
if (asset.getEdgeId() != null) {
|
||||
pushEventsToEdge(tenantId, asset.getEdgeId(), EdgeQueueEntityType.ASSET, tbMsg);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processEntityView(TenantId tenantId, TbMsg tbMsg) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW);
|
||||
break;
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_CREATED:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class);
|
||||
if (entityView.getEdgeId() != null) {
|
||||
pushEventsToEdge(tenantId, entityView.getEdgeId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processDashboard(TenantId tenantId, TbMsg tbMsg) throws IOException {
|
||||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD);
|
||||
}
|
||||
|
||||
private void processRuleChain(TenantId tenantId, TbMsg tbMsg) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
processAssignedEntity(tenantId, tbMsg, EntityType.RULE_CHAIN);
|
||||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN);
|
||||
break;
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_CREATED:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
|
||||
for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) {
|
||||
pushEventToEdge(tenantId, assignedEdge.getEdgeId(), tbMsg.getType(), EntityType.RULE_CHAIN, tbMsg.getData());
|
||||
if (ruleChain.getAssignedEdges() != null && !ruleChain.getAssignedEdges().isEmpty()) {
|
||||
for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) {
|
||||
pushEventsToEdge(tenantId, assignedEdge.getEdgeId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg);
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported message type " + tbMsg.getType());
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, String type, EntityType entityType, String data) {
|
||||
log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type [{}], data [{}]", tenantId, edgeId, type, data);
|
||||
private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType) throws IOException {
|
||||
EdgeId edgeId;
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
|
||||
pushEventsToEdge(tenantId, edgeId, entityType, tbMsg);
|
||||
break;
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
|
||||
pushEventsToEdge(tenantId, edgeId, entityType, tbMsg);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private void pushEventsToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg) throws IOException {
|
||||
log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg);
|
||||
|
||||
pushEventsToEdge(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData());
|
||||
|
||||
if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) {
|
||||
pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void pushEventsToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data) throws IOException {
|
||||
log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data);
|
||||
|
||||
EdgeQueueEntry queueEntry = new EdgeQueueEntry();
|
||||
queueEntry.setType(type);
|
||||
queueEntry.setEntityType(entityType);
|
||||
queueEntry.setType(type);
|
||||
queueEntry.setData(data);
|
||||
|
||||
Event event = new Event();
|
||||
@ -394,6 +453,20 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
eventService.saveAsync(event);
|
||||
}
|
||||
|
||||
private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg) throws IOException {
|
||||
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId());
|
||||
pushEventsToEdge(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData));
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) {
|
||||
return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink);
|
||||
|
||||
@ -388,17 +388,24 @@ function RuleChainService($http, $q, $filter, $ocLazyLoad, $translate, types, co
|
||||
}
|
||||
|
||||
function prepareRuleChain(ruleChain) {
|
||||
ruleChain.assignedEdgesText = "";
|
||||
ruleChain.assignedEdgesIds = [];
|
||||
|
||||
if (ruleChain.assignedEdges && ruleChain.assignedEdges.length) {
|
||||
var assignedEdgesTitles = [];
|
||||
for (var j = 0; j < ruleChain.assignedEdges.length; j++) {
|
||||
var assignedEdge = ruleChain.assignedEdges[j];
|
||||
ruleChain.assignedEdgesIds.push(assignedEdge.edgeId.id);
|
||||
assignedEdgesTitles.push(assignedEdge.title);
|
||||
}
|
||||
ruleChain.assignedEdgesText = assignedEdgesTitles.join(', ');
|
||||
}
|
||||
|
||||
return ruleChain;
|
||||
}
|
||||
|
||||
function cleanRuleChain(ruleChain) {
|
||||
delete ruleChain.assignedEdgesText;
|
||||
delete ruleChain.assignedEdgesIds;
|
||||
return ruleChain;
|
||||
}
|
||||
|
||||
@ -251,6 +251,7 @@ export default function ImportExport($log, $translate, $q, $mdDialog, $document,
|
||||
ruleChain.firstRuleNodeId = null;
|
||||
}
|
||||
ruleChain.root = false;
|
||||
delete ruleChain.assignedEdgesText;
|
||||
return ruleChain;
|
||||
}
|
||||
|
||||
|
||||
@ -1436,7 +1436,8 @@
|
||||
"assign-to-edges": "Assign Rule Chain(s) To Edges",
|
||||
"assign-to-edges-text": "Please select the edges to assign the rulechain(s)",
|
||||
"unassign-from-edges": "Unassign Rule Chain(s) From Edges",
|
||||
"unassign-from-edges-text": "Please select the edges to unassign from the rulechain(s)"
|
||||
"unassign-from-edges-text": "Please select the edges to unassign from the rulechain(s)",
|
||||
"assigned-to-edges": "Assigned to edges"
|
||||
},
|
||||
"rulenode": {
|
||||
"details": "Details",
|
||||
|
||||
25
ui/src/app/rulechain/rulechain-card.scss
Normal file
25
ui/src/app/rulechain/rulechain-card.scss
Normal file
@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
.tb-rule-chain-assigned-edges {
|
||||
display: block;
|
||||
display: -webkit-box; /* stylelint-disable-line value-no-vendor-prefix */
|
||||
height: 34px;
|
||||
margin-bottom: 4px;
|
||||
overflow: hidden;
|
||||
text-overflow: ellipsis;
|
||||
-webkit-line-clamp: 2;
|
||||
-webkit-box-orient: vertical; /* stylelint-disable-line property-no-vendor-prefix */
|
||||
}
|
||||
@ -15,4 +15,5 @@
|
||||
limitations under the License.
|
||||
|
||||
-->
|
||||
<div class="tb-small tb-rule-chain-assigned-edges" ng-show="vm.parentCtl.ruleChainsScope === 'tenant' && vm.item.assignedEdgesText">{{'rulechain.assigned-to-edges' | translate}}: '{{vm.item.assignedEdgesText}}'</div>
|
||||
<div ng-if="item && item.root" translate>rulechain.root</div>
|
||||
|
||||
@ -36,6 +36,11 @@
|
||||
</div>
|
||||
|
||||
<md-content class="md-padding tb-rulechain-fieldset" layout="column">
|
||||
<md-input-container class="md-block"
|
||||
ng-show="!isEdit && ruleChain.assignedEdgesText && ruleChainScope === 'tenant'">
|
||||
<label translate>rulechain.assigned-to-edges</label>
|
||||
<input ng-model="ruleChain.assignedEdgesText" disabled>
|
||||
</md-input-container>
|
||||
<fieldset ng-disabled="$root.loading || !isEdit || isReadOnly">
|
||||
<md-input-container class="md-block">
|
||||
<label translate>rulechain.name</label>
|
||||
|
||||
@ -37,11 +37,12 @@ export default function RuleChainDirective($compile, $templateCache, $mdDialog,
|
||||
scope: {
|
||||
ruleChain: '=',
|
||||
isEdit: '=',
|
||||
ruleChainScope: '=',
|
||||
isReadOnly: '=',
|
||||
theForm: '=',
|
||||
onSetRootRuleChain: '&',
|
||||
onExportRuleChain: '&',
|
||||
onDeleteRuleChain: '&'
|
||||
onDeleteRuleChain: '&',
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -22,6 +22,8 @@ import addRuleChainsToEdgeTemplate from "./add-rulechains-to-edge.tpl.html";
|
||||
|
||||
/* eslint-enable import/no-unresolved, import/default */
|
||||
|
||||
import './rulechain-card.scss';
|
||||
|
||||
/*@ngInject*/
|
||||
export default function RuleChainsController(ruleChainService, userService, importExport, $state,
|
||||
$stateParams, $filter, $translate, $mdDialog, $document, $q, types) {
|
||||
|
||||
@ -24,6 +24,7 @@
|
||||
<md-tab label="{{ 'rulechain.details' | translate }}">
|
||||
<tb-rule-chain rule-chain="vm.grid.operatingItem()"
|
||||
is-edit="vm.grid.detailsConfig.isDetailsEditMode"
|
||||
rule-chain-scope="vm.ruleChainsScope"
|
||||
is-read-only="vm.grid.isDetailsReadOnly(vm.grid.operatingItem())"
|
||||
the-form="vm.grid.detailsForm"
|
||||
on-set-root-rule-chain="vm.setRootRuleChain(event, vm.grid.detailsConfig.currentItem)"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user