Added init service on edge connect

This commit is contained in:
Volodymyr Babak 2020-03-13 19:16:36 +02:00
parent 24d4aed56c
commit 69ba71be27
8 changed files with 311 additions and 133 deletions

View File

@ -29,6 +29,9 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.service.edge.rpc.alarm.AlarmMetadataConstructor;
import org.thingsboard.server.service.edge.rpc.init.InitEdgeService;
import org.thingsboard.server.service.edge.rpc.ruleChain.RuleChainMetadataConstructor;
@Component
@Data
@ -73,4 +76,16 @@ public class EdgeContextComponent {
@Lazy
@Autowired
private ActorService actorService;
@Lazy
@Autowired
private InitEdgeService initEdgeService;
@Lazy
@Autowired
private RuleChainMetadataConstructor ruleChainMetadataConstructor;
@Lazy
@Autowired
private AlarmMetadataConstructor alarmMetadataConstructor;
}

View File

@ -150,6 +150,9 @@ public final class EdgeGrpcSession implements Cloneable {
if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
}
if (ConnectResponseCode.ACCEPTED == responseMsg.getResponseCode()) {
ctx.getInitEdgeService().init(edge, outputStream);
}
}
if (connected) {
if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasUplinkMsg()) {
@ -403,7 +406,7 @@ public final class EdgeGrpcSession implements Cloneable {
private void onRuleChainUpdated(UpdateMsgType msgType, RuleChain ruleChain) {
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
.setRuleChainUpdateMsg(constructRuleChainUpdatedMsg(msgType, ruleChain))
.setRuleChainUpdateMsg(ctx.getRuleChainMetadataConstructor().constructRuleChainUpdatedMsg(edge, msgType, ruleChain))
.build();
outputStream.onNext(ResponseMsg.newBuilder()
.setEntityUpdateMsg(entityUpdateMsg)
@ -411,7 +414,8 @@ public final class EdgeGrpcSession implements Cloneable {
}
private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
ctx.getRuleChainMetadataConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
if (ruleChainMetadataUpdateMsg != null) {
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
.setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
@ -433,43 +437,13 @@ public final class EdgeGrpcSession implements Cloneable {
private void onAlarmUpdated(UpdateMsgType msgType, Alarm alarm) {
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
.setAlarmUpdateMsg(constructAlarmUpdatedMsg(msgType, alarm))
.setAlarmUpdateMsg(ctx.getAlarmMetadataConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
.build();
outputStream.onNext(ResponseMsg.newBuilder()
.setEntityUpdateMsg(entityUpdateMsg)
.build());
}
private AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm) {
String entityName = null;
switch (alarm.getOriginator().getEntityType()) {
case DEVICE:
entityName = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(alarm.getOriginator().getId())).getName();
break;
case ASSET:
entityName = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(alarm.getOriginator().getId())).getName();
break;
case ENTITY_VIEW:
entityName = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(alarm.getOriginator().getId())).getName();
break;
}
AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
.setMsgType(msgType)
.setName(alarm.getName())
.setType(alarm.getType())
.setOriginatorName(entityName)
.setOriginatorType(alarm.getOriginator().getEntityType().name())
.setSeverity(alarm.getSeverity().name())
.setStatus(alarm.getStatus().name())
.setStartTs(alarm.getStartTs())
.setEndTs(alarm.getEndTs())
.setAckTs(alarm.getAckTs())
.setClearTs(alarm.getClearTs())
.setDetails(JacksonUtil.toString(alarm.getDetails()))
.setPropagate(alarm.isPropagate());
return builder.build();
}
private UpdateMsgType getResponseMsgType(String msgType) {
if (msgType.equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
msgType.equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
@ -496,22 +470,6 @@ public final class EdgeGrpcSession implements Cloneable {
}
}
private RuleChainUpdateMsg constructRuleChainUpdatedMsg(UpdateMsgType msgType, RuleChain ruleChain) {
RuleChainUpdateMsg.Builder builder = RuleChainUpdateMsg.newBuilder()
.setMsgType(msgType)
.setIdMSB(ruleChain.getId().getId().getMostSignificantBits())
.setIdLSB(ruleChain.getId().getId().getLeastSignificantBits())
.setName(ruleChain.getName())
.setRoot(ruleChain.getId().equals(edge.getRootRuleChainId()))
.setDebugMode(ruleChain.isDebugMode())
.setConfiguration(JacksonUtil.toString(ruleChain.getConfiguration()));
if (ruleChain.getFirstRuleNodeId() != null) {
builder.setFirstRuleNodeIdMSB(ruleChain.getFirstRuleNodeId().getId().getMostSignificantBits())
.setFirstRuleNodeIdLSB(ruleChain.getFirstRuleNodeId().getId().getLeastSignificantBits());
}
return builder.build();
}
private DownlinkMsg constructDownlinkEntityDataMsg(String entityName, TbMsg tbMsg) {
EntityDataProto entityData = EntityDataProto.newBuilder()
.setEntityName(entityName)
@ -523,85 +481,6 @@ public final class EdgeGrpcSession implements Cloneable {
return builder.build();
}
private RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
try {
RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder()
.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits())
.setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits())
.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections()));
if (ruleChainMetaData.getFirstNodeIndex() != null) {
builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex());
}
builder.setMsgType(msgType);
return builder.build();
} catch (JsonProcessingException ex) {
log.error("Can't construct RuleChainMetadataUpdateMsg", ex);
}
return null;
}
private List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections) throws JsonProcessingException {
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
result.add(constructRuleChainConnection(ruleChainConnectionInfo));
}
}
return result;
}
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException {
return RuleChainConnectionInfoProto.newBuilder()
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
.setType(ruleChainConnectionInfo.getType())
.setAdditionalInfo(objectMapper.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo()))
.build();
}
private List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
List<NodeConnectionInfoProto> result = new ArrayList<>();
if (connections != null && !connections.isEmpty()) {
for (NodeConnectionInfo connection : connections) {
result.add(constructConnection(connection));
}
}
return result;
}
private NodeConnectionInfoProto constructConnection(NodeConnectionInfo connection) {
return NodeConnectionInfoProto.newBuilder()
.setFromIndex(connection.getFromIndex())
.setToIndex(connection.getToIndex())
.setType(connection.getType())
.build();
}
private List<RuleNodeProto> constructNodes(List<RuleNode> nodes) throws JsonProcessingException {
List<RuleNodeProto> result = new ArrayList<>();
if (nodes != null && !nodes.isEmpty()) {
for (RuleNode node : nodes) {
result.add(constructNode(node));
}
}
return result;
}
private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
return RuleNodeProto.newBuilder()
.setIdMSB(node.getId().getId().getMostSignificantBits())
.setIdLSB(node.getId().getId().getLeastSignificantBits())
.setType(node.getType())
.setName(node.getName())
.setDebugMode(node.isDebugMode())
.setConfiguration(objectMapper.writeValueAsString(node.getConfiguration()))
.setAdditionalInfo(objectMapper.writeValueAsString(node.getAdditionalInfo()))
.build();
}
private DashboardUpdateMsg constructDashboardUpdatedMsg(UpdateMsgType msgType, Dashboard dashboard) {
dashboard = ctx.getDashboardService().findDashboardById(edge.getTenantId(), dashboard.getId());
DashboardUpdateMsg.Builder builder = DashboardUpdateMsg.newBuilder()

View File

@ -0,0 +1,63 @@
package org.thingsboard.server.service.edge.rpc.alarm;
import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.jcajce.provider.symmetric.DES;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
@Component
@Slf4j
public class AlarmMetadataConstructor {
@Autowired
private DeviceService deviceService;
@Autowired
private AssetService assetService;
@Autowired
private EntityViewService entityViewService;
public AlarmUpdateMsg constructAlarmUpdatedMsg(TenantId tenantId, UpdateMsgType msgType, Alarm alarm) {
String entityName = null;
switch (alarm.getOriginator().getEntityType()) {
case DEVICE:
entityName = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId())).getName();
break;
case ASSET:
entityName = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId())).getName();
break;
case ENTITY_VIEW:
entityName = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId())).getName();
break;
}
AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
.setMsgType(msgType)
.setName(alarm.getName())
.setType(alarm.getType())
.setOriginatorName(entityName)
.setOriginatorType(alarm.getOriginator().getEntityType().name())
.setSeverity(alarm.getSeverity().name())
.setStatus(alarm.getStatus().name())
.setStartTs(alarm.getStartTs())
.setEndTs(alarm.getEndTs())
.setAckTs(alarm.getAckTs())
.setClearTs(alarm.getClearTs())
.setDetails(JacksonUtil.toString(alarm.getDetails()))
.setPropagate(alarm.isPropagate());
return builder.build();
}
}

View File

@ -0,0 +1,80 @@
package org.thingsboard.server.service.edge.rpc.init;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.service.edge.rpc.ruleChain.RuleChainMetadataConstructor;
@Service
@Slf4j
public class DefaultInitEdgeService implements InitEdgeService {
@Autowired
private RuleChainService ruleChainService;
@Autowired
private RuleChainMetadataConstructor ruleChainMetadataConstructor;
@Override
public void init(Edge edge, StreamObserver<ResponseMsg> outputStream) {
initRuleChains(edge, outputStream);
}
private void initRuleChains(Edge edge, StreamObserver<ResponseMsg> outputStream) {
try {
TimePageLink pageLink = new TimePageLink(100);
TimePageData<RuleChain> pageData;
do {
pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get();
if (!pageData.getData().isEmpty()) {
log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (RuleChain ruleChain : pageData.getData()) {
RuleChainUpdateMsg ruleChainUpdateMsg =
ruleChainMetadataConstructor.constructRuleChainUpdatedMsg(
edge,
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
ruleChain);
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
.setRuleChainUpdateMsg(ruleChainUpdateMsg)
.build();
outputStream.onNext(ResponseMsg.newBuilder()
.setEntityUpdateMsg(entityUpdateMsg)
.build());
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edge.getTenantId(), ruleChain.getId());
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
ruleChainMetadataConstructor.constructRuleChainMetadataUpdatedMsg(
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE,
ruleChainMetaData);
if (ruleChainMetadataUpdateMsg != null) {
entityUpdateMsg = EntityUpdateMsg.newBuilder()
.setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
.build();
outputStream.onNext(ResponseMsg.newBuilder()
.setEntityUpdateMsg(entityUpdateMsg)
.build());
}
}
}
if (pageData.hasNext()) {
pageLink = pageData.getNextPageLink();
}
} while (pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge rule chains on init!");
}
}
}

View File

@ -0,0 +1,10 @@
package org.thingsboard.server.service.edge.rpc.init;
import io.grpc.stub.StreamObserver;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.gen.edge.ResponseMsg;
public interface InitEdgeService {
void init(Edge edge, StreamObserver<ResponseMsg> outputStream);
}

View File

@ -0,0 +1,127 @@
package org.thingsboard.server.service.edge.rpc.ruleChain;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import org.thingsboard.server.gen.edge.NodeConnectionInfoProto;
import org.thingsboard.server.gen.edge.RuleChainConnectionInfoProto;
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.RuleNodeProto;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import java.util.ArrayList;
import java.util.List;
@Component
@Slf4j
public class RuleChainMetadataConstructor {
private static final ObjectMapper objectMapper = new ObjectMapper();
public RuleChainUpdateMsg constructRuleChainUpdatedMsg(Edge edge, UpdateMsgType msgType, RuleChain ruleChain) {
RuleChainUpdateMsg.Builder builder = RuleChainUpdateMsg.newBuilder()
.setMsgType(msgType)
.setIdMSB(ruleChain.getId().getId().getMostSignificantBits())
.setIdLSB(ruleChain.getId().getId().getLeastSignificantBits())
.setName(ruleChain.getName())
.setRoot(ruleChain.getId().equals(edge.getRootRuleChainId()))
.setDebugMode(ruleChain.isDebugMode())
.setConfiguration(JacksonUtil.toString(ruleChain.getConfiguration()));
if (ruleChain.getFirstRuleNodeId() != null) {
builder.setFirstRuleNodeIdMSB(ruleChain.getFirstRuleNodeId().getId().getMostSignificantBits())
.setFirstRuleNodeIdLSB(ruleChain.getFirstRuleNodeId().getId().getLeastSignificantBits());
}
return builder.build();
}
public RuleChainMetadataUpdateMsg constructRuleChainMetadataUpdatedMsg(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
try {
RuleChainMetadataUpdateMsg.Builder builder = RuleChainMetadataUpdateMsg.newBuilder()
.setRuleChainIdMSB(ruleChainMetaData.getRuleChainId().getId().getMostSignificantBits())
.setRuleChainIdLSB(ruleChainMetaData.getRuleChainId().getId().getLeastSignificantBits())
.addAllNodes(constructNodes(ruleChainMetaData.getNodes()))
.addAllConnections(constructConnections(ruleChainMetaData.getConnections()))
.addAllRuleChainConnections(constructRuleChainConnections(ruleChainMetaData.getRuleChainConnections()));
if (ruleChainMetaData.getFirstNodeIndex() != null) {
builder.setFirstNodeIndex(ruleChainMetaData.getFirstNodeIndex());
}
builder.setMsgType(msgType);
return builder.build();
} catch (JsonProcessingException ex) {
log.error("Can't construct RuleChainMetadataUpdateMsg", ex);
}
return null;
}
private List<NodeConnectionInfoProto> constructConnections(List<NodeConnectionInfo> connections) {
List<NodeConnectionInfoProto> result = new ArrayList<>();
if (connections != null && !connections.isEmpty()) {
for (NodeConnectionInfo connection : connections) {
result.add(constructConnection(connection));
}
}
return result;
}
private NodeConnectionInfoProto constructConnection(NodeConnectionInfo connection) {
return NodeConnectionInfoProto.newBuilder()
.setFromIndex(connection.getFromIndex())
.setToIndex(connection.getToIndex())
.setType(connection.getType())
.build();
}
private List<RuleNodeProto> constructNodes(List<RuleNode> nodes) throws JsonProcessingException {
List<RuleNodeProto> result = new ArrayList<>();
if (nodes != null && !nodes.isEmpty()) {
for (RuleNode node : nodes) {
result.add(constructNode(node));
}
}
return result;
}
private List<RuleChainConnectionInfoProto> constructRuleChainConnections(List<RuleChainConnectionInfo> ruleChainConnections) throws JsonProcessingException {
List<RuleChainConnectionInfoProto> result = new ArrayList<>();
if (ruleChainConnections != null && !ruleChainConnections.isEmpty()) {
for (RuleChainConnectionInfo ruleChainConnectionInfo : ruleChainConnections) {
result.add(constructRuleChainConnection(ruleChainConnectionInfo));
}
}
return result;
}
private RuleChainConnectionInfoProto constructRuleChainConnection(RuleChainConnectionInfo ruleChainConnectionInfo) throws JsonProcessingException {
return RuleChainConnectionInfoProto.newBuilder()
.setFromIndex(ruleChainConnectionInfo.getFromIndex())
.setTargetRuleChainIdMSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getMostSignificantBits())
.setTargetRuleChainIdLSB(ruleChainConnectionInfo.getTargetRuleChainId().getId().getLeastSignificantBits())
.setType(ruleChainConnectionInfo.getType())
.setAdditionalInfo(objectMapper.writeValueAsString(ruleChainConnectionInfo.getAdditionalInfo()))
.build();
}
private RuleNodeProto constructNode(RuleNode node) throws JsonProcessingException {
return RuleNodeProto.newBuilder()
.setIdMSB(node.getId().getId().getMostSignificantBits())
.setIdLSB(node.getId().getId().getLeastSignificantBits())
.setType(node.getType())
.setName(node.getName())
.setDebugMode(node.isDebugMode())
.setConfiguration(objectMapper.writeValueAsString(node.getConfiguration()))
.setAdditionalInfo(objectMapper.writeValueAsString(node.getAdditionalInfo()))
.build();
}
}

View File

@ -81,10 +81,10 @@ message ConnectResponseMsg {
message EdgeConfiguration {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string name = 5;
string routingKey = 6;
string type = 7;
string cloudType = 8;
string name = 3;
string routingKey = 4;
string type = 5;
string cloudType = 6;
}
enum UpdateMsgType {
@ -237,7 +237,7 @@ message UplinkMsg {
int32 uplinkMsgId = 1;
repeated EntityDataProto entityData = 2;
repeated DeviceUpdateMsg deviceUpdateMsg = 3;
repeated AlarmUpdateMsg alarmUpdatemsg = 4;
repeated AlarmUpdateMsg alarmUpdateMsg = 4;
}
message UplinkResponseMsg {

View File

@ -48,6 +48,10 @@ public class TbMsgPushToEdgeNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
if ("edge".equalsIgnoreCase(msg.getMetaData().getValue("source"))) {
return;
}
msg.getMetaData().putValue("source", "cloud");
ctx.getEdgeService().pushEventToEdge(ctx.getTenantId(), msg, new PushToEdgeNodeCallback(ctx, msg));
}