From 94bb9f5be3c3ca878b9ed5d6b95a7e3156c29ea0 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 6 Oct 2020 18:59:18 +0300 Subject: [PATCH] update the fix in the API --- .../server/actors/ruleChain/DefaultTbContext.java | 10 +++++----- .../rule/engine/rpc/TbSendRPCRequestNode.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index cf49982f9d..42993273e2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -138,31 +138,31 @@ class DefaultTbContext implements TbContext { @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index df3ed752b3..2bd7f315fe 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -113,7 +113,7 @@ public class TbSendRPCRequestNode implements TbNode { ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); - ctx.enqueueForTellNext(next, next.getQueueName(), TbRelationTypes.SUCCESS, null, null); + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); } else { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name()));