Merge pull request #14033 from dskarzh/rule-node/save-to-custom-table/remove-executor

Save to custom table node: remove redundant executor per node
This commit is contained in:
Viacheslav Klimov 2025-09-22 12:35:48 +03:00 committed by GitHub
commit 8390760698
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 35 deletions

View File

@ -16,21 +16,16 @@
package org.thingsboard.rule.engine.action;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -50,8 +45,6 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
@ -82,7 +75,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
private CassandraCluster cassandraCluster;
private ConsistencyLevel defaultWriteLevel;
private PreparedStatement saveStmt;
private ExecutorService readResultsProcessingExecutor;
private Map<String, String> fieldsMap;
@Override
@ -95,31 +87,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
if (!isTableExists()) {
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.");
}
startExecutor();
saveStmt = getSaveStmt();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
withCallback(save(msg, ctx), success -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
}
@Override
public void destroy() {
stopExecutor();
saveStmt = null;
}
private void startExecutor() {
readResultsProcessingExecutor = Executors.newCachedThreadPool();
}
private void stopExecutor() {
if (readResultsProcessingExecutor != null) {
readResultsProcessingExecutor.shutdownNow();
}
}
private boolean isTableExists() {
var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName());
return keyspaceMdOpt.map(keyspaceMetadata ->
@ -180,7 +160,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
return query.toString();
}
private ListenableFuture<Void> save(TbMsg msg, TbContext ctx) {
private TbResultSetFuture save(TbMsg msg, TbContext ctx) {
JsonElement data = JsonParser.parseString(msg.getData());
if (!data.isJsonObject()) {
throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data);
@ -221,7 +201,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
if (config.getDefaultTtl() > 0) {
stmtBuilder.setInt(i.get(), config.getDefaultTtl());
}
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
return executeAsyncWrite(ctx, stmtBuilder.build());
}
}
@ -251,16 +231,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
}
}
private <T> ListenableFuture<T> getFuture(TbResultSetFuture future, java.util.function.Function<AsyncResultSet, T> transformer) {
return Futures.transform(future, new Function<AsyncResultSet, T>() {
@Nullable
@Override
public T apply(@Nullable AsyncResultSet input) {
return transformer.apply(input);
}
}, readResultsProcessingExecutor);
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;

View File

@ -24,12 +24,10 @@ import java.util.Map;
@Data
public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfiguration<TbSaveToCustomCassandraTableNodeConfiguration> {
private String tableName;
private Map<String, String> fieldsMapping;
private int defaultTtl;
@Override
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
@ -40,4 +38,5 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig
configuration.setFieldsMapping(map);
return configuration;
}
}