From 788ac46f080fd655278067f5a5b186f7f8dbc915 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Tue, 11 Jul 2023 16:13:55 +0300 Subject: [PATCH 1/2] External rule nodes improvement: When force ack create new message to free memory after acknowledgment. Set request timeout for some external nodes. --- .../rule/engine/aws/sns/TbSnsNode.java | 9 +++++++-- .../rule/engine/aws/sqs/TbSqsNode.java | 10 +++++++--- .../engine/external/TbAbstractExternalNode.java | 5 ++++- .../rule/engine/gcp/pubsub/TbPubSubNode.java | 15 ++++++++++++++- .../rule/engine/kafka/TbKafkaNode.java | 10 +++++----- .../rule/engine/mail/TbSendEmailNode.java | 8 ++++---- .../thingsboard/rule/engine/mqtt/TbMqttNode.java | 8 ++++---- .../engine/notification/TbNotificationNode.java | 9 +++++---- .../rule/engine/notification/TbSlackNode.java | 6 +++--- .../rule/engine/rabbitmq/TbRabbitMqNode.java | 6 +++--- .../rule/engine/rest/TbRestApiCallNode.java | 4 ++-- .../rule/engine/sms/TbSendSmsNode.java | 10 +++++----- 12 files changed, 63 insertions(+), 37 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java index bad37922e7..bb93cb8d63 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.aws.sns; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -71,6 +72,9 @@ public class TbSnsNode extends TbAbstractExternalNode { this.snsClient = AmazonSNSClient.builder() .withCredentials(credProvider) .withRegion(this.config.getRegion()) + .withClientConfiguration(new ClientConfiguration() + .withConnectionTimeout(10000) + .withRequestTimeout(5000)) .build(); } catch (Exception e) { throw new TbNodeException(e); @@ -79,9 +83,10 @@ public class TbSnsNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - withCallback(publishMessageAsync(ctx, msg), + var tbMsg = ackIfNeeded(ctx, msg); + withCallback(publishMessageAsync(ctx, tbMsg), m -> tellSuccess(ctx, m), - t -> tellFailure(ctx, processException(ctx, msg, t), t)); + t -> tellFailure(ctx, processException(ctx, tbMsg, t), t)); ackIfNeeded(ctx, msg); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java index f8ebc8e295..2b9f8916cf 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.aws.sqs; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -78,6 +79,9 @@ public class TbSqsNode extends TbAbstractExternalNode { this.sqsClient = AmazonSQSClientBuilder.standard() .withCredentials(credProvider) .withRegion(this.config.getRegion()) + .withClientConfiguration(new ClientConfiguration() + .withConnectionTimeout(10000) + .withRequestTimeout(5000)) .build(); } catch (Exception e) { throw new TbNodeException(e); @@ -86,10 +90,10 @@ public class TbSqsNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - withCallback(publishMessageAsync(ctx, msg), + var tbMsg = ackIfNeeded(ctx, msg); + withCallback(publishMessageAsync(ctx, tbMsg), m -> tellSuccess(ctx, m), - t -> tellFailure(ctx, processException(ctx, msg, t), t)); - ackIfNeeded(ctx, msg); + t -> tellFailure(ctx, processException(ctx, tbMsg, t), t)); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java index f5f8c34ab3..834dbe2390 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java @@ -52,9 +52,12 @@ public abstract class TbAbstractExternalNode implements TbNode { } } - protected void ackIfNeeded(TbContext ctx, TbMsg msg) { + protected TbMsg ackIfNeeded(TbContext ctx, TbMsg msg) { if (forceAck) { ctx.ack(msg); + return msg.copyWithNewCtx(); + } else { + return msg; } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java index 3b927f05a6..18d817fd94 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; @@ -36,6 +37,7 @@ import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.threeten.bp.Duration; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -75,8 +77,8 @@ public class TbPubSubNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { + msg = ackIfNeeded(ctx, msg); publishMessage(ctx, msg); - ackIfNeeded(ctx, msg); } @Override @@ -134,8 +136,19 @@ public class TbPubSubNode extends TbAbstractExternalNode { new ByteArrayInputStream(config.getServiceAccountKey().getBytes())); CredentialsProvider credProvider = FixedCredentialsProvider.create(credentials); + var retrySettings = RetrySettings.newBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setInitialRetryDelay(Duration.ofMillis(50)) + .setRetryDelayMultiplier(1.1) + .setMaxRetryDelay(Duration.ofSeconds(2)) + .setInitialRpcTimeout(Duration.ofSeconds(2)) + .setRpcTimeoutMultiplier(1) + .setMaxRpcTimeout(Duration.ofSeconds(10)) + .build(); + return Publisher.newBuilder(topicName) .setCredentialsProvider(credProvider) + .setRetrySettings(retrySettings) .build(); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index ea8e3edb9d..760eec2421 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -115,25 +115,25 @@ public class TbKafkaNode extends TbAbstractExternalNode { public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg); String keyPattern = config.getKeyPattern(); + var tbMsg = ackIfNeeded(ctx, msg); try { if (initError != null) { - ctx.tellFailure(msg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage())); + ctx.tellFailure(tbMsg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage())); } else { ctx.getExternalCallExecutor().executeAsync(() -> { publish( ctx, - msg, + tbMsg, topic, keyPattern == null || keyPattern.isEmpty() ? null - : TbNodeUtils.processPattern(config.getKeyPattern(), msg) + : TbNodeUtils.processPattern(config.getKeyPattern(), tbMsg) ); return null; }); } - ackIfNeeded(ctx, msg); } catch (Exception e) { - ctx.tellFailure(msg, e); + ctx.tellFailure(tbMsg, e); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java index 3afea4f817..3e705d9f08 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java @@ -75,13 +75,13 @@ public class TbSendEmailNode extends TbAbstractExternalNode { try { validateType(msg.getType()); TbEmail email = getEmail(msg); + var tbMsg = ackIfNeeded(ctx, msg); withCallback(ctx.getMailExecutor().executeAsync(() -> { - sendEmail(ctx, msg, email); + sendEmail(ctx, tbMsg, email); return null; }), - ok -> tellSuccess(ctx, msg), - fail -> tellFailure(ctx, msg, fail)); - ackIfNeeded(ctx, msg); + ok -> tellSuccess(ctx, tbMsg), + fail -> tellFailure(ctx, tbMsg, fail)); } catch (Exception ex) { ctx.tellFailure(msg, ex); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 8fac7b1683..121b9fb756 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -80,16 +80,16 @@ public class TbMqttNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg); - this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) + var tbMsg = ackIfNeeded(ctx, msg); + this.mqttClient.publish(topic, Unpooled.wrappedBuffer(tbMsg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) .addListener(future -> { if (future.isSuccess()) { - tellSuccess(ctx, msg); + tellSuccess(ctx, tbMsg); } else { - tellFailure(ctx, processException(ctx, msg, future.cause()), future.cause()); + tellFailure(ctx, processException(ctx, tbMsg, future.cause()), future.cause()); } } ); - ackIfNeeded(ctx, msg); } private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java index bf57628d64..f1fd9727fa 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java @@ -71,16 +71,17 @@ public class TbNotificationNode extends TbAbstractExternalNode { .originatorEntityId(ctx.getSelf().getRuleChainId()) .build(); + var tbMsg = ackIfNeeded(ctx, msg); + DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> { - TbMsgMetaData metaData = msg.getMetaData().copy(); + TbMsgMetaData metaData = tbMsg.getMetaData().copy(); metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats)); - tellSuccess(ctx, TbMsg.transformMsg(msg, metaData)); + tellSuccess(ctx, TbMsg.transformMsg(tbMsg, metaData)); })), r -> { }, - e -> tellFailure(ctx, msg, e)); - ackIfNeeded(ctx, msg); + e -> tellFailure(ctx, tbMsg, e)); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java index fd56043848..86e2c4bd1c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java @@ -61,12 +61,12 @@ public class TbSlackNode extends TbAbstractExternalNode { } String message = TbNodeUtils.processPattern(config.getMessageTemplate(), msg); + var tbMsg = ackIfNeeded(ctx, msg); DonAsynchron.withCallback(ctx.getExternalCallExecutor().executeAsync(() -> { ctx.getSlackService().sendMessage(ctx.getTenantId(), token, config.getConversation().getId(), message); }), - r -> tellSuccess(ctx, msg), - e -> tellFailure(ctx, msg, e)); - ackIfNeeded(ctx, msg); + r -> tellSuccess(ctx, tbMsg), + e -> tellFailure(ctx, tbMsg, e)); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java index 4ae634902f..4b42ee1d18 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java @@ -84,10 +84,10 @@ public class TbRabbitMqNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - withCallback(publishMessageAsync(ctx, msg), + var tbMsg = ackIfNeeded(ctx, msg); + withCallback(publishMessageAsync(ctx, tbMsg), m -> tellSuccess(ctx, m), - t -> tellFailure(ctx, processException(ctx, msg, t), t)); - ackIfNeeded(ctx, msg); + t -> tellFailure(ctx, processException(ctx, tbMsg, t), t)); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java index 94b0e5d078..c6083f1098 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java @@ -60,10 +60,10 @@ public class TbRestApiCallNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - httpClient.processMessage(ctx, msg, + var tbMsg = ackIfNeeded(ctx, msg); + httpClient.processMessage(ctx, tbMsg, m -> tellSuccess(ctx, m), (m, t) -> tellFailure(ctx, m, t)); - ackIfNeeded(ctx, msg); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java index f6cbccd4d4..55209e1f64 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java @@ -60,16 +60,16 @@ public class TbSendSmsNode extends TbAbstractExternalNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { + var tbMsg = ackIfNeeded(ctx, msg); try { withCallback(ctx.getSmsExecutor().executeAsync(() -> { - sendSms(ctx, msg); + sendSms(ctx, tbMsg); return null; }), - ok -> tellSuccess(ctx, msg), - fail -> tellFailure(ctx, msg, fail)); - ackIfNeeded(ctx, msg); + ok -> tellSuccess(ctx, tbMsg), + fail -> tellFailure(ctx, tbMsg, fail)); } catch (Exception ex) { - ctx.tellFailure(msg, ex); + ctx.tellFailure(tbMsg, ex); } } From 3c2d5eeb30fb7e249d52ce04a8924c190af1fbc6 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Tue, 11 Jul 2023 16:15:51 +0300 Subject: [PATCH 2/2] Minor fix --- .../main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java index bb93cb8d63..a8a1137a98 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java @@ -87,7 +87,6 @@ public class TbSnsNode extends TbAbstractExternalNode { withCallback(publishMessageAsync(ctx, tbMsg), m -> tellSuccess(ctx, m), t -> tellFailure(ctx, processException(ctx, tbMsg, t), t)); - ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) {