From 85fcfef8a5e45b88f359fe3242dbfc2b242123ec Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 15 Sep 2020 13:12:53 +0300 Subject: [PATCH] Added support for RPC call for edge devices --- .../server/actors/ActorSystemContext.java | 3 + .../service/edge/rpc/EdgeGrpcSession.java | 18 +++ .../rpc/constructor/DeviceMsgConstructor.java | 25 ++++- .../edge/rpc/processor/BaseProcessor.java | 4 + .../edge/rpc/processor/DeviceProcessor.java | 17 +++ .../rpc/DefaultTbRuleEngineRpcService.java | 3 +- .../rpc/TbRuleEngineDeviceRpcService.java | 3 + .../src/main/resources/thingsboard.yml | 2 +- common/edge-api/src/main/proto/edge.proto | 25 ++++- .../server/dao/edge/EdgeServiceImpl.java | 65 ++++++----- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 105 ++++++++---------- .../rule/engine/rpc/TbSendRPCRequestNode.java | 72 ++++++++++-- 12 files changed, 238 insertions(+), 104 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 3f4dcb3c0b..b95e1d92f8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -255,12 +255,15 @@ public class ActorSystemContext { @Getter private TbCoreDeviceRpcService tbCoreDeviceRpcService; + @Lazy @Autowired(required = false) @Getter private EdgeService edgeService; + @Lazy @Autowired(required = false) @Getter private EdgeEventService edgeEventService; + @Lazy @Autowired(required = false) @Getter private EdgeRpcService edgeRpcService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 078978435d..5332810ee2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -78,6 +78,7 @@ import org.thingsboard.server.gen.edge.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.DownlinkResponseMsg; @@ -333,6 +334,9 @@ public final class EdgeGrpcSession implements Closeable { case ENTITY_EXISTS_REQUEST: downlinkMsg = processEntityExistsRequestMessage(edgeEvent); break; + case RPC_CALL: + downlinkMsg = processRpcCallMsg(edgeEvent); + break; } if (downlinkMsg != null) { result.add(downlinkMsg); @@ -358,6 +362,15 @@ public final class EdgeGrpcSession implements Closeable { return downlinkMsg; } + private DownlinkMsg processRpcCallMsg(EdgeEvent edgeEvent) { + log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent); + DeviceRpcCallMsg deviceRpcCallMsg = + ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getEntityBody()); + return DownlinkMsg.newBuilder() + .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg)) + .build(); + } + private DownlinkMsg processCredentialsRequestMessage(EdgeEvent edgeEvent) { DownlinkMsg downlinkMsg = null; if (EdgeEventType.DEVICE.equals(edgeEvent.getEdgeEventType())) { @@ -883,6 +896,11 @@ public final class EdgeGrpcSession implements Closeable { result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge, deviceCredentialsRequestMsg)); } } + if (uplinkMsg.getDeviceRpcCallMsgList() != null && !uplinkMsg.getDeviceRpcCallMsgList().isEmpty()) { + for (DeviceRpcCallMsg deviceRpcCallMsg: uplinkMsg.getDeviceRpcCallMsgList()) { + result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg)); + } + } } catch (Exception e) { log.error("Can't process uplink msg [{}]", uplinkMsg, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java index ebc180d0f9..38905d7529 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java @@ -15,21 +15,27 @@ */ package org.thingsboard.server.service.edge.rpc.constructor; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; +import org.thingsboard.server.gen.edge.RpcRequestMsg; import org.thingsboard.server.gen.edge.UpdateMsgType; @Component @Slf4j public class DeviceMsgConstructor { + protected static final ObjectMapper mapper = new ObjectMapper(); + public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, CustomerId customerId) { DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() .setMsgType(msgType) @@ -67,4 +73,21 @@ public class DeviceMsgConstructor { .setIdMSB(deviceId.getId().getMostSignificantBits()) .setIdLSB(deviceId.getId().getLeastSignificantBits()).build(); } + + public DeviceRpcCallMsg constructDeviceRpcCallMsg(JsonNode body) { + RuleEngineDeviceRpcRequest request = mapper.convertValue(body, RuleEngineDeviceRpcRequest.class); + RpcRequestMsg.Builder requestBuilder = RpcRequestMsg.newBuilder(); + requestBuilder.setMethod(request.getMethod()); + requestBuilder.setParams(request.getBody()); + DeviceRpcCallMsg.Builder builder = DeviceRpcCallMsg.newBuilder() + .setDeviceIdMSB(request.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(request.getDeviceId().getId().getLeastSignificantBits()) + .setRequestIdMSB(request.getRequestUUID().getMostSignificantBits()) + .setRequestIdLSB(request.getRequestUUID().getLeastSignificantBits()) + .setExpirationTime(request.getExpirationTime()) + .setOriginServiceId(request.getOriginServiceId()) + .setOneway(request.isOneway()) + .setRequestMsg(requestBuilder.build()); + return builder.build(); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java index ee5c545999..08597b1743 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java @@ -39,6 +39,7 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.state.DeviceStateService; @Slf4j @@ -46,6 +47,9 @@ public abstract class BaseProcessor { protected static final ObjectMapper mapper = new ObjectMapper(); + @Autowired + protected TbRuleEngineDeviceRpcService tbDeviceRpcService; + @Autowired protected AlarmService alarmService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java index db122e31f9..b1f588c6f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java @@ -21,7 +21,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; +import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.audit.ActionType; @@ -40,9 +42,11 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; @@ -213,4 +217,17 @@ public class DeviceProcessor extends BaseProcessor { metaData.putValue("edgeName", edge.getName()); return metaData; } + + public ListenableFuture processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { + UUID uuid = new UUID(deviceRpcCallMsg.getRequestIdMSB(), deviceRpcCallMsg.getRequestIdLSB()); + FromDeviceRpcResponse response; + if (!StringUtils.isEmpty(deviceRpcCallMsg.getResponseMsg().getError())) { + response = new FromDeviceRpcResponse(uuid, null, RpcError.valueOf(deviceRpcCallMsg.getResponseMsg().getError())); + } else { + response = new FromDeviceRpcResponse(uuid, deviceRpcCallMsg.getResponseMsg().getResponse(), null); + } + tbDeviceRpcService.sendRpcResponseToTbCore(deviceRpcCallMsg.getOriginServiceId(), response); + return Futures.immediateFuture(null); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 80d47a84e8..9ebbed0e44 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -151,7 +151,8 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi } } - private void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response) { + @Override + public void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response) { if (serviceId.equals(originServiceId)) { if (tbCoreRpcService.isPresent()) { tbCoreRpcService.get().processRpcResponseFromRuleEngine(response); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/TbRuleEngineDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/TbRuleEngineDeviceRpcService.java index 3d4f23a796..4508da381e 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/TbRuleEngineDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/TbRuleEngineDeviceRpcService.java @@ -29,4 +29,7 @@ public interface TbRuleEngineDeviceRpcService extends RuleEngineRpcService { */ void processRpcResponseFromDevice(FromDeviceRpcResponse response); + + void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response); + } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 3845517ffb..2a748b7f34 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -588,7 +588,7 @@ transport: # Edges parameters edges: rpc: - enabled: "${EDGES_RPC_ENABLED:true}" + enabled: "${EDGES_RPC_ENABLED:false}" port: "${EDGES_RPC_PORT:7070}" ssl: # Enable/disable SSL support diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 35be11839f..560522e021 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -322,6 +322,28 @@ message DeviceCredentialsRequestMsg { int64 deviceIdLSB = 2; } +message DeviceRpcCallMsg { + int64 deviceIdMSB = 1; + int64 deviceIdLSB = 2; + int64 requestIdMSB = 3; + int64 requestIdLSB = 4; + int64 expirationTime = 5; + bool oneway = 6; + string originServiceId = 7; + RpcRequestMsg requestMsg = 8; + RpcResponseMsg responseMsg = 9; +} + +message RpcRequestMsg { + string method = 1; + string params = 2; +} + +message RpcResponseMsg { + string response = 1; + string error = 2; +} + enum EdgeEntityType { DEVICE = 0; ASSET = 1; @@ -343,6 +365,7 @@ message UplinkMsg { repeated RelationRequestMsg relationRequestMsg = 9; repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10; repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 11; + repeated DeviceRpcCallMsg deviceRpcCallMsg = 12; } message UplinkResponseMsg { @@ -374,6 +397,6 @@ message DownlinkMsg { repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 16; repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 17; repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 18; - + repeated DeviceRpcCallMsg deviceRpcCallMsg = 19; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index b7562aea84..2a362e6483 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -425,38 +425,43 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @Override public ListenableFuture> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { - switch (entityId.getEntityType()) { - case DEVICE: - case ASSET: - case ENTITY_VIEW: - ListenableFuture> originatorEdgeRelationsFuture = - relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); - return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> { - if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0 && - originatorEdgeRelations.get(0).getFrom() != null) { - return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); - } else { - return Collections.emptyList(); + if (EntityType.TENANT.equals(entityId.getEntityType())) { + TextPageData edgesByTenantId = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); + return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); + } else { + switch (entityId.getEntityType()) { + case DEVICE: + case ASSET: + case ENTITY_VIEW: + ListenableFuture> originatorEdgeRelationsFuture = + relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); + return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> { + if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0 && + originatorEdgeRelations.get(0).getFrom() != null) { + return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); + } else { + return Collections.emptyList(); + } + }, MoreExecutors.directExecutor()); + case DASHBOARD: + return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()))); + case RULE_CHAIN: + return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()))); + case USER: + User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); + if (userById == null) { + return Futures.immediateFuture(Collections.emptyList()); } - }, MoreExecutors.directExecutor()); - case DASHBOARD: - return convertToEdgeIds(findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()))); - case RULE_CHAIN: - return convertToEdgeIds(findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()))); - case USER: - User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); - if (userById == null) { + TextPageData edges; + if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) { + edges = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); + } else { + edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE)); + } + return convertToEdgeIds(Futures.immediateFuture(edges.getData())); + default: return Futures.immediateFuture(Collections.emptyList()); - } - TextPageData edges; - if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) { - edges = findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); - } else { - edges = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), new TextPageLink(Integer.MAX_VALUE)); - } - return convertToEdgeIds(Futures.immediateFuture(edges.getData())); - default: - return Futures.immediateFuture(Collections.emptyList()); + } } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index 1c0e44050d..b7cea6de20 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -33,29 +33,20 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.audit.ActionType; -import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.IdBased; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.TextPageData; -import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.plugin.ComponentType; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @@ -86,51 +77,12 @@ public class TbMsgPushToEdgeNode implements TbNode { public void onMsg(TbContext ctx, TbMsg msg) { if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) { log.debug("Ignoring msg from the cloud, msg [{}]", msg); + ctx.ack(msg); return; } if (isSupportedOriginator(msg.getOriginator().getEntityType())) { if (isSupportedMsgType(msg.getType())) { - ListenableFuture> getEdgeIdsFuture = getEdgeIdsByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator()); - Futures.addCallback(getEdgeIdsFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List edgeIds) { - if (edgeIds != null && !edgeIds.isEmpty()) { - for (EdgeId edgeId : edgeIds) { - try { - EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); - if (edgeEvent == null) { - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); - } else { - edgeEvent.setEdgeId(edgeId); - ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent event) { - ctx.tellNext(msg, SUCCESS); - } - - @Override - public void onFailure(Throwable th) { - log.error("Could not save edge event", th); - ctx.tellFailure(msg, th); - } - }, ctx.getDbCallbackExecutor()); - } - } catch (JsonProcessingException e) { - log.error("Failed to build edge event", e); - ctx.tellFailure(msg, e); - } - } - } - } - - @Override - public void onFailure(Throwable t) { - ctx.tellFailure(msg, t); - } - - }, ctx.getDbCallbackExecutor()); + processMsg(ctx, msg); } else { log.debug("Unsupported msg type {}", msg.getType()); ctx.tellFailure(msg, new RuntimeException("Unsupported msg type '" + msg.getType() + "'")); @@ -141,6 +93,50 @@ public class TbMsgPushToEdgeNode implements TbNode { } } + private void processMsg(TbContext ctx, TbMsg msg) { + ListenableFuture> getEdgeIdsFuture = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator()); + Futures.addCallback(getEdgeIdsFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List edgeIds) { + if (edgeIds != null && !edgeIds.isEmpty()) { + for (EdgeId edgeId : edgeIds) { + try { + EdgeEvent edgeEvent = buildEdgeEvent(msg, ctx); + if (edgeEvent == null) { + log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType()); + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'")); + } else { + edgeEvent.setEdgeId(edgeId); + ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable EdgeEvent event) { + ctx.tellNext(msg, SUCCESS); + } + + @Override + public void onFailure(Throwable th) { + log.error("Could not save edge event", th); + ctx.tellFailure(msg, th); + } + }, ctx.getDbCallbackExecutor()); + } + } catch (JsonProcessingException e) { + log.error("Failed to build edge event", e); + ctx.tellFailure(msg, e); + } + } + } + } + + @Override + public void onFailure(Throwable t) { + ctx.tellFailure(msg, t); + } + + }, ctx.getDbCallbackExecutor()); + } + private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) throws JsonProcessingException { if (DataConstants.ALARM.equals(msg.getType())) { return buildEdgeEvent(ctx.getTenantId(), ActionType.ADDED, getUUIDFromMsgData(msg), EdgeEventType.ALARM, null); @@ -227,15 +223,6 @@ public class TbMsgPushToEdgeNode implements TbNode { || DataConstants.ALARM.equals(msgType); } - private ListenableFuture> getEdgeIdsByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) { - if (EntityType.TENANT.equals(originatorId.getEntityType())) { - TextPageData edgesByTenantId = ctx.getEdgeService().findEdgesByTenantId(tenantId, new TextPageLink(Integer.MAX_VALUE)); - return Futures.immediateFuture(edgesByTenantId.getData().stream().map(IdBased::getId).collect(Collectors.toList())); - } else { - return ctx.getEdgeService().findRelatedEdgeIdsByEntityId(tenantId, originatorId); - } - } - @Override public void destroy() { } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 301c7cd36c..fe3326c4bc 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -16,13 +16,16 @@ package org.thingsboard.rule.engine.rpc; import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -30,13 +33,21 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.plugin.ComponentType; -import org.thingsboard.server.common.data.rule.RuleChainType; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.msg.TbMsg; +import javax.annotation.Nullable; +import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -55,6 +66,7 @@ import java.util.concurrent.TimeUnit; ) public class TbSendRPCRequestNode implements TbNode { + private static final ObjectMapper json = new ObjectMapper(); private Random random = new Random(); private Gson gson = new Gson(); private JsonParser jsonParser = new JsonParser(); @@ -111,19 +123,57 @@ public class TbSendRPCRequestNode implements TbNode { .restApiCall(restApiCall) .build(); - ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { - if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); - ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); - } else { - TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); - ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); - } - }); + EdgeId edgeId = findRelatedEdgeId(ctx, msg); + if (edgeId != null) { + sendRpcRequestToEdgeDevice(ctx, msg, edgeId, request); + } else { + ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { + if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); + } else { + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); + ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); + } + }); + } ctx.ack(msg); } } + private EdgeId findRelatedEdgeId(TbContext ctx, TbMsg msg) { + List result = + ctx.getRelationService().findByToAndType(ctx.getTenantId(), msg.getOriginator(), EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON); + if (result != null && result.size() > 0) { + return new EdgeId(result.get(0).getFrom().getId()); + } else { + return null; + } + } + + private void sendRpcRequestToEdgeDevice(TbContext ctx, TbMsg msg, EdgeId edgeId, RuleEngineDeviceRpcRequest request) { + EdgeEvent edgeEvent = new EdgeEvent(); + edgeEvent.setTenantId(ctx.getTenantId()); + edgeEvent.setEdgeEventAction(ActionType.RPC_CALL.name()); + edgeEvent.setEntityId(request.getDeviceId().getId()); + edgeEvent.setEdgeEventType(EdgeEventType.DEVICE); + edgeEvent.setEntityBody(json.valueToTree(request)); + edgeEvent.setEdgeId(edgeId); + ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable EdgeEvent event) { + ctx.tellSuccess(msg); + } + + @Override + public void onFailure(Throwable th) { + log.error("Could not save edge event", th); + ctx.tellFailure(msg, th); + } + }, ctx.getDbCallbackExecutor()); + } + @Override public void destroy() { }