From 649898566c4b5c652bd1d008bc4abcb9ef97744c Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 21 Jun 2023 19:05:42 +0300 Subject: [PATCH 1/2] Add unrecoverable initialization error for actors. Fix timeouts on missing rule chain id in the rule chain input node --- .../actors/ruleChain/DefaultTbContext.java | 5 +- .../ruleChain/RuleChainManagerActor.java | 9 ++- .../actors/shared/RuleChainErrorActor.java | 78 +++++++++++++++++++ .../server/actors/TbActorMailbox.java | 17 +++- .../server/common/msg/TbActorError.java | 22 ++++++ .../common/msg/aware/RuleChainAwareMsg.java | 3 + .../rule/engine/api/TbContext.java | 2 +- .../rule/engine/api/TbNodeException.java | 18 ++++- .../rule/engine/api/util/TbNodeUtils.java | 2 +- .../rule/engine/action/TbCreateAlarmNode.java | 2 +- 10 files changed, 149 insertions(+), 9 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 59d633540e..d32867c626 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -35,6 +35,7 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.slack.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; @@ -843,9 +844,9 @@ class DefaultTbContext implements TbContext { } @Override - public void checkTenantEntity(EntityId entityId) { + public void checkTenantEntity(EntityId entityId) throws TbNodeException { if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) { - throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant."); + throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java index 9534104ec1..7f919754fc 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java @@ -23,6 +23,7 @@ import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.DefaultActorService; +import org.thingsboard.server.actors.shared.RuleChainErrorActor; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; @@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.dao.rule.RuleChainService; import java.util.function.Function; @@ -86,7 +88,12 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { () -> DefaultActorService.RULE_DISPATCHER_NAME, () -> { RuleChain ruleChain = provider.apply(ruleChainId); - return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); + if (ruleChain == null) { + return new RuleChainErrorActor.ActorCreator(systemContext, tenantId, + new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!")); + } else { + return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); + } }); } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java b/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java new file mode 100644 index 0000000000..a204c7b286 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.shared; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbStringActorId; +import org.thingsboard.server.actors.service.ContextAwareActor; +import org.thingsboard.server.actors.service.ContextBasedCreator; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; +import org.thingsboard.server.common.msg.queue.RuleEngineException; + +import java.util.UUID; + +@Slf4j +public class RuleChainErrorActor extends ContextAwareActor { + + private final TenantId tenantId; + private final RuleEngineException error; + + private RuleChainErrorActor(ActorSystemContext systemContext, TenantId tenantId, RuleEngineException error) { + super(systemContext); + this.tenantId = tenantId; + this.error = error; + } + + @Override + protected boolean doProcess(TbActorMsg msg) { + if (msg instanceof RuleChainAwareMsg) { + log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg); + var rcMsg = (RuleChainAwareMsg) msg; + rcMsg.getMsg().getCallback().onFailure(error); + return true; + } else { + return false; + } + } + + public static class ActorCreator extends ContextBasedCreator { + + private final TenantId tenantId; + private final RuleEngineException error; + + public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) { + super(context); + this.tenantId = tenantId; + this.error = error; + } + + @Override + public TbActorId createActorId() { + return new TbStringActorId(UUID.randomUUID().toString()); + } + + @Override + public TbActor createActor() { + return new RuleChainErrorActor(context, tenantId, error); + } + } + +} diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index 587da8f3b1..ad1604f7b0 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -19,6 +19,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorError; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorStopReason; @@ -69,9 +70,14 @@ public final class TbActorMailbox implements TbActorCtx { } } } catch (Throwable t) { - log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); + InitFailureStrategy strategy; int attemptIdx = attempt + 1; - InitFailureStrategy strategy = actor.onInitFailure(attempt, t); + if (isUnrecoverable(t)) { + strategy = InitFailureStrategy.stop(); + } else { + log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); + strategy = actor.onInitFailure(attempt, t); + } if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) { log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t); stopReason = TbActorStopReason.INIT_FAILED; @@ -88,6 +94,13 @@ public final class TbActorMailbox implements TbActorCtx { } } + private static boolean isUnrecoverable(Throwable t) { + if (t instanceof TbActorException && t.getCause() != null) { + t = t.getCause(); + } + return t instanceof TbActorError && ((TbActorError) t).isUnrecoverable(); + } + private void enqueue(TbActorMsg msg, boolean highPriority) { if (!destroyInProgress.get()) { if (highPriority) { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java new file mode 100644 index 0000000000..fc27feb096 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg; + +public interface TbActorError { + + boolean isUnrecoverable(); + +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java index d0e90ae421..5fbc857e0c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java @@ -17,9 +17,12 @@ package org.thingsboard.server.common.msg.aware; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.TbMsg; public interface RuleChainAwareMsg extends TbActorMsg { RuleChainId getRuleChainId(); + + TbMsg getMsg(); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 1561dac05e..b47264c280 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -210,7 +210,7 @@ public interface TbContext { void schedule(Runnable runnable, long delay, TimeUnit timeUnit); - void checkTenantEntity(EntityId entityId); + void checkTenantEntity(EntityId entityId) throws TbNodeException; boolean isLocalEntity(EntityId entityId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java index 32341dbf2b..a8ce9d7e42 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java @@ -15,17 +15,33 @@ */ package org.thingsboard.rule.engine.api; +import lombok.Getter; +import org.thingsboard.server.common.msg.TbActorError; + /** * Created by ashvayka on 19.01.18. */ -public class TbNodeException extends Exception { +public class TbNodeException extends Exception implements TbActorError { + + @Getter + private final boolean unrecoverable; public TbNodeException(String message) { + this(message, false); + } + + public TbNodeException(String message, boolean unrecoverable) { super(message); + this.unrecoverable = unrecoverable; } public TbNodeException(Exception e) { + this(e, false); + } + + public TbNodeException(Exception e, boolean unrecoverable) { super(e); + this.unrecoverable = unrecoverable; } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index 1ca1d2516e..73cac87832 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -45,7 +45,7 @@ public class TbNodeUtils { try { return mapper.treeToValue(configuration.getData(), clazz); } catch (JsonProcessingException e) { - throw new TbNodeException(e); + throw new TbNodeException(e, true); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java index de68324c98..5297269c45 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java @@ -67,7 +67,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode Date: Thu, 22 Jun 2023 15:59:36 +0300 Subject: [PATCH 2/2] Ability to force ack for external nodes --- .../server/actors/ActorSystemContext.java | 4 ++ .../actors/ruleChain/DefaultTbContext.java | 36 ++++++++--- .../src/main/resources/thingsboard.yml | 4 ++ .../thingsboard/server/common/msg/TbMsg.java | 5 ++ .../rule/engine/api/TbContext.java | 4 ++ .../rule/engine/aws/sns/TbSnsNode.java | 11 ++-- .../rule/engine/aws/sqs/TbSqsNode.java | 9 ++- .../external/TbAbstractExternalNode.java | 61 +++++++++++++++++++ .../rule/engine/gcp/pubsub/TbPubSubNode.java | 11 ++-- .../rule/engine/kafka/TbKafkaNode.java | 11 ++-- .../rule/engine/mail/TbSendEmailNode.java | 11 ++-- .../rule/engine/mqtt/TbMqttNode.java | 12 ++-- .../engine/mqtt/azure/TbAzureIotHubNode.java | 3 +- .../notification/TbNotificationNode.java | 23 ++++--- .../rule/engine/notification/TbSlackNode.java | 9 ++- .../rule/engine/rabbitmq/TbRabbitMqNode.java | 12 ++-- .../rule/engine/rest/TbHttpClient.java | 24 ++++---- .../rule/engine/rest/TbRestApiCallNode.java | 13 ++-- .../rule/engine/sms/TbSendSmsNode.java | 9 ++- .../rule/engine/rest/TbHttpClientTest.java | 4 +- 20 files changed, 200 insertions(+), 76 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 515c635e68..7e5dfed5f0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -538,6 +538,10 @@ public class ActorSystemContext { @Getter private int maxRpcRetries; + @Value("${actors.rule.external.force_ack:false}") + @Getter + private boolean externalNodeForceAck; + @Getter @Setter private TbActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index d32867c626..ca83852fe4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -218,6 +218,12 @@ class DefaultTbContext implements TbContext { enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } + @Override + public void enqueueForTellFailure(TbMsg tbMsg, Throwable th) { + TopicPartitionInfo tpi = resolvePartition(tbMsg); + enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), getFailureMessage(th), null, null); + } + @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { TopicPartitionInfo tpi = resolvePartition(tbMsg); @@ -315,16 +321,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th); } - String failureMessage; - if (th != null) { - if (!StringUtils.isEmpty(th.getMessage())) { - failureMessage = th.getMessage(); - } else { - failureMessage = th.getClass().getSimpleName(); - } - } else { - failureMessage = null; - } + String failureMessage = getFailureMessage(th); nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), msg, failureMessage)); @@ -728,6 +725,11 @@ class DefaultTbContext implements TbContext { return mainCtx.getSlackService(); } + @Override + public boolean isExternalNodeForceAck() { + return mainCtx.isExternalNodeForceAck(); + } + @Override public RuleEngineRpcService getRpcService() { return mainCtx.getTbRuleEngineDeviceRpcService(); @@ -850,6 +852,20 @@ class DefaultTbContext implements TbContext { } } + private static String getFailureMessage(Throwable th) { + String failureMessage; + if (th != null) { + if (!StringUtils.isEmpty(th.getMessage())) { + failureMessage = th.getMessage(); + } else { + failureMessage = th.getClass().getSimpleName(); + } + } else { + failureMessage = null; + } + return failureMessage; + } + private class SimpleTbQueueCallback implements TbQueueCallback { private final Runnable onSuccess; private final Consumer onFailure; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 96e409373e..589540096c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -389,6 +389,10 @@ actors: queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}" # Time in milliseconds for transaction to complete duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}" + external: + # Force acknowledgement of the incoming message for external rule nodes to decrease processing latency. + # Enqueue the result of external node processing as a separate message to the rule engine. + force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}" rpc: max_retries: "${ACTORS_RPC_MAX_RETRIES:5}" sequential: "${ACTORS_RPC_SEQUENTIAL:false}" diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index f04104bc51..17bc7c0a8f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -277,6 +277,11 @@ public final class TbMsg implements Serializable { this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback); } + public TbMsg copyWithNewCtx() { + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.customerId, + this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx.copy(), TbMsgCallback.EMPTY); + } + public TbMsgCallback getCallback() { // May be null in case of deserialization; if (callback != null) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index b47264c280..88bf80e9f9 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -167,6 +167,8 @@ public interface TbContext { void enqueueForTellFailure(TbMsg msg, String failureMessage); + void enqueueForTellFailure(TbMsg tbMsg, Throwable t); + void enqueueForTellNext(TbMsg msg, String relationType); void enqueueForTellNext(TbMsg msg, Set relationTypes); @@ -302,6 +304,8 @@ public interface TbContext { SlackService getSlackService(); + boolean isExternalNodeForceAck(); + /** * Creates JS Script Engine * @deprecated diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java index f02434d4ee..bad37922e7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java @@ -26,10 +26,11 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -51,7 +52,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSnsConfig", iconUrl = "" ) -public class TbSnsNode implements TbNode { +public class TbSnsNode extends TbAbstractExternalNode { private static final String MESSAGE_ID = "messageId"; private static final String REQUEST_ID = "requestId"; @@ -62,6 +63,7 @@ public class TbSnsNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbSnsNodeConfiguration.class); AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey()); AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); @@ -78,8 +80,9 @@ public class TbSnsNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { withCallback(publishMessageAsync(ctx, msg), - ctx::tellSuccess, - t -> ctx.tellFailure(processException(ctx, msg, t), t)); + m -> tellSuccess(ctx, m), + t -> tellFailure(ctx, processException(ctx, msg, t), t)); + ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java index 7386d30c7c..f8ebc8e295 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java @@ -31,6 +31,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -55,7 +56,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSqsConfig", iconUrl = "" ) -public class TbSqsNode implements TbNode { +public class TbSqsNode extends TbAbstractExternalNode { private static final String MESSAGE_ID = "messageId"; private static final String REQUEST_ID = "requestId"; @@ -69,6 +70,7 @@ public class TbSqsNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbSqsNodeConfiguration.class); AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey()); AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); @@ -85,8 +87,9 @@ public class TbSqsNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { withCallback(publishMessageAsync(ctx, msg), - ctx::tellSuccess, - t -> ctx.tellFailure(processException(ctx, msg, t), t)); + m -> tellSuccess(ctx, m), + t -> tellFailure(ctx, processException(ctx, msg, t), t)); + ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java new file mode 100644 index 0000000000..f5f8c34ab3 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.external; + +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.server.common.msg.TbMsg; + +public abstract class TbAbstractExternalNode implements TbNode { + + private boolean forceAck; + + public void init(TbContext ctx) { + this.forceAck = ctx.isExternalNodeForceAck(); + } + + protected void tellSuccess(TbContext ctx, TbMsg tbMsg) { + if (forceAck) { + ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.SUCCESS); + } else { + ctx.tellSuccess(tbMsg); + } + } + + protected void tellFailure(TbContext ctx, TbMsg tbMsg, Throwable t) { + if (forceAck) { + if (t == null) { + ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.FAILURE); + } else { + ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx(), t); + } + } else { + if (t == null) { + ctx.tellNext(tbMsg, TbRelationTypes.FAILURE); + } else { + ctx.tellFailure(tbMsg, t); + } + } + } + + protected void ackIfNeeded(TbContext ctx, TbMsg msg) { + if (forceAck) { + ctx.ack(msg); + } + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java index b0198e210c..3b927f05a6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java @@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -53,7 +54,7 @@ import java.util.concurrent.TimeUnit; configDirective = "tbExternalNodePubSubConfig", iconUrl = "" ) -public class TbPubSubNode implements TbNode { +public class TbPubSubNode extends TbAbstractExternalNode { private static final String MESSAGE_ID = "messageId"; private static final String ERROR = "error"; @@ -63,8 +64,9 @@ public class TbPubSubNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class); try { - this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class); this.pubSubClient = initPubSubClient(); } catch (Exception e) { throw new TbNodeException(e); @@ -74,6 +76,7 @@ public class TbPubSubNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { publishMessage(ctx, msg); + ackIfNeeded(ctx, msg); } @Override @@ -101,12 +104,12 @@ public class TbPubSubNode implements TbNode { ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { public void onSuccess(String messageId) { TbMsg next = processPublishResult(ctx, msg, messageId); - ctx.tellSuccess(next); + tellSuccess(ctx, next); } public void onFailure(Throwable t) { TbMsg next = processException(ctx, msg, t); - ctx.tellFailure(next, t); + tellFailure(ctx, next, t); } }, ctx.getExternalCallExecutor()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index de1abea224..ea8e3edb9d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -56,7 +57,7 @@ import java.util.Properties; configDirective = "tbExternalNodeKafkaConfig", iconUrl = "" ) -public class TbKafkaNode implements TbNode { +public class TbKafkaNode extends TbAbstractExternalNode { private static final String OFFSET = "offset"; private static final String PARTITION = "partition"; @@ -78,6 +79,7 @@ public class TbKafkaNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class); this.initError = null; Properties properties = new Properties(); @@ -129,6 +131,7 @@ public class TbKafkaNode implements TbNode { return null; }); } + ackIfNeeded(ctx, msg); } catch (Exception e) { ctx.tellFailure(msg, e); } @@ -164,11 +167,9 @@ public class TbKafkaNode implements TbNode { private void processRecord(TbContext ctx, TbMsg msg, RecordMetadata metadata, Exception e) { if (e == null) { - TbMsg next = processResponse(ctx, msg, metadata); - ctx.tellNext(next, TbRelationTypes.SUCCESS); + tellSuccess(ctx, processResponse(ctx, msg, metadata)); } else { - TbMsg next = processException(ctx, msg, e); - ctx.tellFailure(next, e); + tellFailure(ctx, processException(ctx, msg, e), e); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java index 3224465d68..3afea4f817 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -47,7 +48,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSendEmailConfig", icon = "send" ) -public class TbSendEmailNode implements TbNode { +public class TbSendEmailNode extends TbAbstractExternalNode { private static final String MAIL_PROP = "mail."; static final String SEND_EMAIL_TYPE = "SEND_EMAIL"; @@ -58,8 +59,9 @@ public class TbSendEmailNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class); try { - this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class); if (!this.config.isUseSystemSmtpSettings()) { mailSender = createMailSender(); } @@ -77,8 +79,9 @@ public class TbSendEmailNode implements TbNode { sendEmail(ctx, msg, email); return null; }), - ok -> ctx.tellSuccess(msg), - fail -> ctx.tellFailure(msg, fail)); + ok -> tellSuccess(ctx, msg), + fail -> tellFailure(ctx, msg, fail)); + ackIfNeeded(ctx, msg); } catch (Exception ex) { ctx.tellFailure(msg, ex); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 478e733958..8fac7b1683 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.credentials.BasicCredentials; import org.thingsboard.rule.engine.credentials.ClientCredentials; import org.thingsboard.rule.engine.credentials.CredentialsType; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentType; @@ -55,7 +56,7 @@ import java.util.concurrent.TimeoutException; configDirective = "tbExternalNodeMqttConfig", icon = "call_split" ) -public class TbMqttNode implements TbNode { +public class TbMqttNode extends TbAbstractExternalNode { private static final Charset UTF8 = Charset.forName("UTF-8"); @@ -67,8 +68,9 @@ public class TbMqttNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); try { - this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); this.mqttClient = initClient(ctx); } catch (Exception e) { throw new TbNodeException(e); @@ -81,13 +83,13 @@ public class TbMqttNode implements TbNode { this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) .addListener(future -> { if (future.isSuccess()) { - ctx.tellSuccess(msg); + tellSuccess(ctx, msg); } else { - TbMsg next = processException(ctx, msg, future.cause()); - ctx.tellFailure(next, future.cause()); + tellFailure(ctx, processException(ctx, msg, future.cause()), future.cause()); } } ); + ackIfNeeded(ctx, msg); } private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 5720f66038..3ea96fa967 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -48,8 +48,9 @@ import javax.net.ssl.SSLException; public class TbAzureIotHubNode extends TbMqttNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); try { - this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); mqttNodeConfiguration.setPort(8883); mqttNodeConfiguration.setCleanSession(true); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java index 057702cb2b..bf57628d64 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java @@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.notification.NotificationRequestConfig; import org.thingsboard.server.common.data.notification.info.RuleEngineOriginatedNotificationInfo; @@ -42,12 +43,13 @@ import java.util.concurrent.ExecutionException; configDirective = "tbExternalNodeNotificationConfig", icon = "notifications" ) -public class TbNotificationNode implements TbNode { +public class TbNotificationNode extends TbAbstractExternalNode { private TbNotificationNodeConfiguration config; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbNotificationNodeConfiguration.class); } @@ -69,15 +71,16 @@ public class TbNotificationNode implements TbNode { .originatorEntityId(ctx.getSelf().getRuleChainId()) .build(); - DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> { - return ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> { - TbMsgMetaData metaData = msg.getMetaData().copy(); - metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats)); - ctx.tellSuccess(TbMsg.transformMsg(msg, metaData)); - }); - }), - r -> {}, - e -> ctx.tellFailure(msg, e)); + DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> + ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> { + TbMsgMetaData metaData = msg.getMetaData().copy(); + metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats)); + tellSuccess(ctx, TbMsg.transformMsg(msg, metaData)); + })), + r -> { + }, + e -> tellFailure(ctx, msg, e)); + ackIfNeeded(ctx, msg); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java index 544a864931..fd56043848 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java @@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -37,12 +38,13 @@ import java.util.concurrent.ExecutionException; configDirective = "tbExternalNodeSlackConfig", iconUrl = "" ) -public class TbSlackNode implements TbNode { +public class TbSlackNode extends TbAbstractExternalNode { private TbSlackNodeConfiguration config; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbSlackNodeConfiguration.class); } @@ -62,8 +64,9 @@ public class TbSlackNode implements TbNode { DonAsynchron.withCallback(ctx.getExternalCallExecutor().executeAsync(() -> { ctx.getSlackService().sendMessage(ctx.getTenantId(), token, config.getConversation().getId(), message); }), - r -> ctx.tellSuccess(msg), - e -> ctx.tellFailure(msg, e)); + r -> tellSuccess(ctx, msg), + e -> tellFailure(ctx, msg, e)); + ackIfNeeded(ctx, msg); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java index 41dffe2fd8..4ae634902f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java @@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -48,7 +49,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeRabbitMqConfig", iconUrl = "" ) -public class TbRabbitMqNode implements TbNode { +public class TbRabbitMqNode extends TbAbstractExternalNode { private static final Charset UTF8 = Charset.forName("UTF-8"); @@ -61,6 +62,7 @@ public class TbRabbitMqNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbRabbitMqNodeConfiguration.class); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(this.config.getHost()); @@ -83,11 +85,9 @@ public class TbRabbitMqNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { withCallback(publishMessageAsync(ctx, msg), - ctx::tellSuccess, - t -> { - TbMsg next = processException(ctx, msg, t); - ctx.tellFailure(next, t); - }); + m -> tellSuccess(ctx, m), + t -> tellFailure(ctx, processException(ctx, msg, t), t)); + ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index 6f9541f5b1..28e221b0b1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; @Data @Slf4j @@ -182,14 +183,16 @@ public class TbHttpClient { } } - public void processMessage(TbContext ctx, TbMsg msg) { + public void processMessage(TbContext ctx, TbMsg msg, + Consumer onSuccess, + BiConsumer onFailure) { String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg); HttpHeaders headers = prepareHeaders(msg); HttpMethod method = HttpMethod.valueOf(config.getRequestMethod()); HttpEntity entity; - if(HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) || - HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) || - config.isIgnoreRequestBody()) { + if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) || + HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) || + config.isIgnoreRequestBody()) { entity = new HttpEntity<>(headers); } else { entity = new HttpEntity<>(getData(msg), headers); @@ -198,21 +201,18 @@ public class TbHttpClient { URI uri = buildEncodedUri(endpointUrl); ListenableFuture> future = httpClient.exchange( uri, method, entity, String.class); - future.addCallback(new ListenableFutureCallback>() { + future.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(Throwable throwable) { - TbMsg next = processException(ctx, msg, throwable); - ctx.tellFailure(next, throwable); + onFailure.accept(processException(ctx, msg, throwable), throwable); } @Override public void onSuccess(ResponseEntity responseEntity) { if (responseEntity.getStatusCode().is2xxSuccessful()) { - TbMsg next = processResponse(ctx, msg, responseEntity); - ctx.tellSuccess(next); + onSuccess.accept(processResponse(ctx, msg, responseEntity)); } else { - TbMsg next = processFailureResponse(ctx, msg, responseEntity); - ctx.tellNext(next, TbRelationTypes.FAILURE); + onFailure.accept(processFailureResponse(ctx, msg, responseEntity), null); } } }); @@ -248,7 +248,7 @@ public class TbHttpClient { if (config.isTrimDoubleQuotes()) { final String dataBefore = data; - data = data.replaceAll("^\"|\"$", "");; + data = data.replaceAll("^\"|\"$", ""); log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java index d2356c69f7..94b0e5d078 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java @@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -43,24 +44,26 @@ import org.thingsboard.server.common.msg.TbMsg; configDirective = "tbExternalNodeRestApiCallConfig", iconUrl = "" ) -public class TbRestApiCallNode implements TbNode { +public class TbRestApiCallNode extends TbAbstractExternalNode { - private boolean useRedisQueueForMsgPersistence; protected TbHttpClient httpClient; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class); httpClient = new TbHttpClient(config, ctx.getSharedEventLoop()); - useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence(); - if (useRedisQueueForMsgPersistence) { + if (config.isUseRedisQueueForMsgPersistence()) { log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId()); } } @Override public void onMsg(TbContext ctx, TbMsg msg) { - httpClient.processMessage(ctx, msg); + httpClient.processMessage(ctx, msg, + m -> tellSuccess(ctx, m), + (m, t) -> tellFailure(ctx, m, t)); + ackIfNeeded(ctx, msg); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java index e92b15780a..f6cbccd4d4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java @@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.sms.SmsSender; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -39,13 +40,14 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSendSmsConfig", icon = "sms" ) -public class TbSendSmsNode implements TbNode { +public class TbSendSmsNode extends TbAbstractExternalNode { private TbSendSmsNodeConfiguration config; private SmsSender smsSender; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); try { this.config = TbNodeUtils.convert(configuration, TbSendSmsNodeConfiguration.class); if (!this.config.isUseSystemSmsSettings()) { @@ -63,8 +65,9 @@ public class TbSendSmsNode implements TbNode { sendSms(ctx, msg); return null; }), - ok -> ctx.tellSuccess(msg), - fail -> ctx.tellFailure(msg, fail)); + ok -> tellSuccess(ctx, msg), + fail -> tellFailure(ctx, msg, fail)); + ackIfNeeded(ctx, msg); } catch (Exception ex) { ctx.tellFailure(msg, ex); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java index 14ab6cfbbf..7b6a54ae0b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java @@ -167,7 +167,9 @@ public class TbHttpClientTest { capturedData.capture() )).thenReturn(successMsg); - httpClient.processMessage(ctx, msg); + httpClient.processMessage(ctx, msg, + m -> ctx.tellSuccess(msg), + (m, t) -> ctx.tellFailure(m, t)); Awaitility.await() .atMost(30, TimeUnit.SECONDS)