diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index a5abd77ad5..b7f6bd3dd2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -23,6 +23,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -45,6 +46,7 @@ import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; @@ -100,7 +102,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get(), ActorRef.noSender()); + TbActorMsg tbActorMsg = actorMsg.get(); + if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) { + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg); + } else { + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); + actorContext.tell(actorMsg.get(), ActorRef.noSender()); + } } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 0c48c54412..182a62d397 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; @@ -31,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.queue.TbQueueConsumer; @@ -48,6 +50,9 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.stats.RuleEngineStatisticsService; import javax.annotation.PostConstruct; @@ -81,6 +86,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory; private final TbQueueRuleEngineSettings ruleEngineSettings; private final RuleEngineStatisticsService statisticsService; + private final TbRuleEngineDeviceRpcService tbDeviceRpcService; private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); @@ -90,13 +96,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbRuleEngineSubmitStrategyFactory submitStrategyFactory, TbQueueRuleEngineSettings ruleEngineSettings, TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, - ActorSystemContext actorContext, DataDecodingEncodingService encodingService) { + ActorSystemContext actorContext, DataDecodingEncodingService encodingService, + TbRuleEngineDeviceRpcService tbDeviceRpcService) { super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); this.statisticsService = statisticsService; this.ruleEngineSettings = ruleEngineSettings; this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; this.submitStrategyFactory = submitStrategyFactory; this.processingStrategyFactory = processingStrategyFactory; + this.tbDeviceRpcService = tbDeviceRpcService; } @PostConstruct @@ -227,7 +235,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< actorContext.tell(actorMsg.get(), ActorRef.noSender()); } callback.onSuccess(); + } else if (nfMsg.hasFromDeviceRpcResponse()) { + TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse(); + RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) + , proto.getResponse(), error); + tbDeviceRpcService.processRpcResponseFromDevice(response); + callback.onSuccess(); } else { + log.trace("Received notification with missing handler"); callback.onSuccess(); } }