Merge branch 'master' into develop/3.5.2

This commit is contained in:
Igor Kulikov 2023-07-11 20:19:56 +03:00
commit 7186f30c60
12 changed files with 63 additions and 38 deletions

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.aws.sns;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@ -71,6 +72,9 @@ public class TbSnsNode extends TbAbstractExternalNode {
this.snsClient = AmazonSNSClient.builder()
.withCredentials(credProvider)
.withRegion(this.config.getRegion())
.withClientConfiguration(new ClientConfiguration()
.withConnectionTimeout(10000)
.withRequestTimeout(5000))
.build();
} catch (Exception e) {
throw new TbNodeException(e);
@ -79,10 +83,10 @@ public class TbSnsNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
withCallback(publishMessageAsync(ctx, msg),
var tbMsg = ackIfNeeded(ctx, msg);
withCallback(publishMessageAsync(ctx, tbMsg),
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, msg, t), t));
ackIfNeeded(ctx, msg);
t -> tellFailure(ctx, processException(ctx, tbMsg, t), t));
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.aws.sqs;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@ -78,6 +79,9 @@ public class TbSqsNode extends TbAbstractExternalNode {
this.sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(credProvider)
.withRegion(this.config.getRegion())
.withClientConfiguration(new ClientConfiguration()
.withConnectionTimeout(10000)
.withRequestTimeout(5000))
.build();
} catch (Exception e) {
throw new TbNodeException(e);
@ -86,10 +90,10 @@ public class TbSqsNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(publishMessageAsync(ctx, msg),
var tbMsg = ackIfNeeded(ctx, msg);
withCallback(publishMessageAsync(ctx, tbMsg),
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, msg, t), t));
ackIfNeeded(ctx, msg);
t -> tellFailure(ctx, processException(ctx, tbMsg, t), t));
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {

View File

@ -52,9 +52,12 @@ public abstract class TbAbstractExternalNode implements TbNode {
}
}
protected void ackIfNeeded(TbContext ctx, TbMsg msg) {
protected TbMsg ackIfNeeded(TbContext ctx, TbMsg msg) {
if (forceAck) {
ctx.ack(msg);
return msg.copyWithNewCtx();
} else {
return msg;
}
}

View File

@ -20,6 +20,7 @@ import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
@ -36,6 +37,7 @@ import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.threeten.bp.Duration;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -75,8 +77,8 @@ public class TbPubSubNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
msg = ackIfNeeded(ctx, msg);
publishMessage(ctx, msg);
ackIfNeeded(ctx, msg);
}
@Override
@ -134,8 +136,19 @@ public class TbPubSubNode extends TbAbstractExternalNode {
new ByteArrayInputStream(config.getServiceAccountKey().getBytes()));
CredentialsProvider credProvider = FixedCredentialsProvider.create(credentials);
var retrySettings = RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(10))
.setInitialRetryDelay(Duration.ofMillis(50))
.setRetryDelayMultiplier(1.1)
.setMaxRetryDelay(Duration.ofSeconds(2))
.setInitialRpcTimeout(Duration.ofSeconds(2))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build();
return Publisher.newBuilder(topicName)
.setCredentialsProvider(credProvider)
.setRetrySettings(retrySettings)
.build();
}
}

View File

@ -115,25 +115,25 @@ public class TbKafkaNode extends TbAbstractExternalNode {
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
String keyPattern = config.getKeyPattern();
var tbMsg = ackIfNeeded(ctx, msg);
try {
if (initError != null) {
ctx.tellFailure(msg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage()));
ctx.tellFailure(tbMsg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage()));
} else {
ctx.getExternalCallExecutor().executeAsync(() -> {
publish(
ctx,
msg,
tbMsg,
topic,
keyPattern == null || keyPattern.isEmpty()
? null
: TbNodeUtils.processPattern(config.getKeyPattern(), msg)
: TbNodeUtils.processPattern(config.getKeyPattern(), tbMsg)
);
return null;
});
}
ackIfNeeded(ctx, msg);
} catch (Exception e) {
ctx.tellFailure(msg, e);
ctx.tellFailure(tbMsg, e);
}
}

View File

@ -72,13 +72,13 @@ public class TbSendEmailNode extends TbAbstractExternalNode {
try {
validateType(msg.getType());
TbEmail email = getEmail(msg);
var tbMsg = ackIfNeeded(ctx, msg);
withCallback(ctx.getMailExecutor().executeAsync(() -> {
sendEmail(ctx, msg, email);
sendEmail(ctx, tbMsg, email);
return null;
}),
ok -> tellSuccess(ctx, msg),
fail -> tellFailure(ctx, msg, fail));
ackIfNeeded(ctx, msg);
ok -> tellSuccess(ctx, tbMsg),
fail -> tellFailure(ctx, tbMsg, fail));
} catch (Exception ex) {
ctx.tellFailure(msg, ex);
}

View File

@ -80,16 +80,16 @@ public class TbMqttNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg);
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
var tbMsg = ackIfNeeded(ctx, msg);
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(tbMsg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage())
.addListener(future -> {
if (future.isSuccess()) {
tellSuccess(ctx, msg);
tellSuccess(ctx, tbMsg);
} else {
tellFailure(ctx, processException(ctx, msg, future.cause()), future.cause());
tellFailure(ctx, processException(ctx, tbMsg, future.cause()), future.cause());
}
}
);
ackIfNeeded(ctx, msg);
}
private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) {

View File

@ -71,16 +71,17 @@ public class TbNotificationNode extends TbAbstractExternalNode {
.originatorEntityId(ctx.getSelf().getRuleChainId())
.build();
var tbMsg = ackIfNeeded(ctx, msg);
DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() ->
ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> {
TbMsgMetaData metaData = msg.getMetaData().copy();
TbMsgMetaData metaData = tbMsg.getMetaData().copy();
metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats));
tellSuccess(ctx, TbMsg.transformMsg(msg, metaData));
tellSuccess(ctx, TbMsg.transformMsg(tbMsg, metaData));
})),
r -> {
},
e -> tellFailure(ctx, msg, e));
ackIfNeeded(ctx, msg);
e -> tellFailure(ctx, tbMsg, e));
}
}

View File

@ -61,12 +61,12 @@ public class TbSlackNode extends TbAbstractExternalNode {
}
String message = TbNodeUtils.processPattern(config.getMessageTemplate(), msg);
var tbMsg = ackIfNeeded(ctx, msg);
DonAsynchron.withCallback(ctx.getExternalCallExecutor().executeAsync(() -> {
ctx.getSlackService().sendMessage(ctx.getTenantId(), token, config.getConversation().getId(), message);
}),
r -> tellSuccess(ctx, msg),
e -> tellFailure(ctx, msg, e));
ackIfNeeded(ctx, msg);
r -> tellSuccess(ctx, tbMsg),
e -> tellFailure(ctx, tbMsg, e));
}
}

View File

@ -84,10 +84,10 @@ public class TbRabbitMqNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(publishMessageAsync(ctx, msg),
var tbMsg = ackIfNeeded(ctx, msg);
withCallback(publishMessageAsync(ctx, tbMsg),
m -> tellSuccess(ctx, m),
t -> tellFailure(ctx, processException(ctx, msg, t), t));
ackIfNeeded(ctx, msg);
t -> tellFailure(ctx, processException(ctx, tbMsg, t), t));
}
private ListenableFuture<TbMsg> publishMessageAsync(TbContext ctx, TbMsg msg) {

View File

@ -60,10 +60,10 @@ public class TbRestApiCallNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
httpClient.processMessage(ctx, msg,
var tbMsg = ackIfNeeded(ctx, msg);
httpClient.processMessage(ctx, tbMsg,
m -> tellSuccess(ctx, m),
(m, t) -> tellFailure(ctx, m, t));
ackIfNeeded(ctx, msg);
}
@Override

View File

@ -60,16 +60,16 @@ public class TbSendSmsNode extends TbAbstractExternalNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var tbMsg = ackIfNeeded(ctx, msg);
try {
withCallback(ctx.getSmsExecutor().executeAsync(() -> {
sendSms(ctx, msg);
sendSms(ctx, tbMsg);
return null;
}),
ok -> tellSuccess(ctx, msg),
fail -> tellFailure(ctx, msg, fail));
ackIfNeeded(ctx, msg);
ok -> tellSuccess(ctx, tbMsg),
fail -> tellFailure(ctx, tbMsg, fail));
} catch (Exception ex) {
ctx.tellFailure(msg, ex);
ctx.tellFailure(tbMsg, ex);
}
}