diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 48fa123eeb..223fb94937 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -95,6 +95,7 @@ public class AppActor extends ContextAwareActor { case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: + case REMOVE_RPC_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((TenantAwareMsg) msg, true); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 3e06b8387c..47bc554532 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -84,6 +85,9 @@ public class DeviceActor extends ContextAwareActor { case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: processor.processEdgeUpdate((DeviceEdgeUpdateMsg) msg); break; + case REMOVE_RPC_TO_DEVICE_ACTOR_MSG: + processor.processRemoveRpc(ctx, (RemoveRpcActorMsg) msg); + break; default: return false; } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 137b8b8863..6224a3635d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -88,6 +88,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceAct import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -263,6 +264,21 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } + void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) { + log.debug("[{}] Processing remove rpc command", msg.getRequestId()); + Integer requestId = null; + for (Map.Entry entry : toDeviceRpcPendingMap.entrySet()) { + if (entry.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) { + requestId = entry.getKey(); + break; + } + } + + if (requestId != null) { + toDeviceRpcPendingMap.remove(requestId); + } + } + private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 1a74e368f5..c995aadcae 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -165,6 +165,7 @@ public class TenantActor extends RuleChainManagerActor { case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: + case REMOVE_RPC_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((DeviceAwareMsg) msg, true); break; case RULE_CHAIN_TO_RULE_CHAIN_MSG: diff --git a/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java b/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java index ba14a353d1..43b6af1626 100644 --- a/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java +++ b/application/src/main/java/org/thingsboard/server/controller/AbstractRpcController.java @@ -57,7 +57,7 @@ import java.util.UUID; public abstract class AbstractRpcController extends BaseController { @Autowired - private TbCoreDeviceRpcService deviceRpcService; + protected TbCoreDeviceRpcService deviceRpcService; @Autowired private AccessValidator accessValidator; diff --git a/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java b/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java index 725efbaf43..ae9081c39a 100644 --- a/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java +++ b/application/src/main/java/org/thingsboard/server/controller/RpcV2Controller.java @@ -27,6 +27,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RpcId; @@ -35,11 +36,16 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rpc.Rpc; import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; import org.thingsboard.server.service.security.permission.Operation; import java.util.UUID; +import static org.thingsboard.server.common.data.DataConstants.RPC_DELETED; + @RestController @TbCoreComponent @RequestMapping(TbUrlConstants.RPC_V2_URL_PREFIX) @@ -100,7 +106,21 @@ public class RpcV2Controller extends AbstractRpcController { public void deleteResource(@PathVariable("rpcId") String strRpc) throws ThingsboardException { checkParameter("RpcId", strRpc); try { - rpcService.deleteRpc(getTenantId(), new RpcId(UUID.fromString(strRpc))); + RpcId rpcId = new RpcId(UUID.fromString(strRpc)); + Rpc rpc = checkRpcId(rpcId, Operation.DELETE); + + if (rpc != null) { + if (rpc.getStatus().equals(RpcStatus.QUEUED)) { + RemoveRpcActorMsg removeMsg = new RemoveRpcActorMsg(getTenantId(), rpc.getDeviceId(), rpc.getUuidId()); + log.trace("[{}] Forwarding msg {} to queue actor!", rpc.getDeviceId(), rpc); + tbClusterService.pushMsgToCore(removeMsg, null); + } + + rpcService.deleteRpc(getTenantId(), rpcId); + + TbMsg msg = TbMsg.newMsg(RPC_DELETED, rpc.getDeviceId(), TbMsgMetaData.EMPTY, JacksonUtil.toString(rpc)); + tbClusterService.pushMsgToRuleEngine(getTenantId(), rpc.getDeviceId(), msg, null); + } } catch (Exception e) { throw handleException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 036beaea48..46e49fcdc8 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -139,6 +139,12 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } } + @Override + public void processRemoveRpc(RemoveRpcActorMsg removeRpcMsg) { + log.trace("[{}][{}] Processing remove RPC [{}]", removeRpcMsg.getTenantId(), removeRpcMsg.getRequestId(), removeRpcMsg.getDeviceId()); + actorContext.tellWithHighPriority(removeRpcMsg); + } + private void sendRpcResponseToTbRuleEngine(String originServiceId, FromDeviceRpcResponse response) { if (serviceId.equals(originServiceId)) { if (tbRuleEngineRpcService.isPresent()) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java new file mode 100644 index 0000000000..94d4bec148 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java @@ -0,0 +1,44 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.rpc; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; + +import java.util.UUID; + +@ToString +@RequiredArgsConstructor +public class RemoveRpcActorMsg implements ToDeviceActorNotificationMsg { + + @Getter + private final TenantId tenantId; + @Getter + private final DeviceId deviceId; + + @Getter + private final UUID requestId; + + @Override + public MsgType getMsgType() { + return MsgType.REMOVE_RPC_TO_DEVICE_ACTOR_MSG; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java index db22e1ea64..c68b4a7ac7 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java @@ -56,4 +56,6 @@ public interface TbCoreDeviceRpcService { */ void processRpcResponseFromDeviceActor(FromDeviceRpcResponse response); + void processRemoveRpc(RemoveRpcActorMsg removeRpcMsg); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index d42efcd770..ae51749056 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -89,6 +89,7 @@ public class DataConstants { public static final String RPC_SUCCESSFUL = "RPC_SUCCESSFUL"; public static final String RPC_TIMEOUT = "RPC_TIMEOUT"; public static final String RPC_FAILED = "RPC_FAILED"; + public static final String RPC_DELETED = "RPC_DELETED"; public static final String DEFAULT_SECRET_KEY = ""; public static final String SECRET_KEY_FIELD_NAME = "secretKey"; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index d9f05e3c84..105c624eba 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -92,6 +92,8 @@ public enum MsgType { DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG, + REMOVE_RPC_TO_DEVICE_ACTOR_MSG, + /** * Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement */ diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index 85c2a156b3..cda3269791 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; type = ComponentType.FILTER, name = "message type switch", configClazz = EmptyNodeConfiguration.class, - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", + relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Failed", "RPC Deleted", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", "Timeseries Updated", "Timeseries Deleted"}, @@ -105,6 +105,8 @@ public class TbMsgTypeSwitchNode implements TbNode { relationType = "RPC Timeout"; } else if (msg.getType().equals(DataConstants.RPC_FAILED)) { relationType = "RPC Failed"; + } else if (msg.getType().equals(DataConstants.RPC_DELETED)) { + relationType = "RPC Deleted"; } else { relationType = "Other"; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 336dbeb6ce..a6d0a23ee0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -118,7 +118,7 @@ public class TbSendRPCRequestNode implements TbNode { .build(); ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { - if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { + if (ruleEngineDeviceRpcResponse.getError().isEmpty()) { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); } else {