diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index e56a3459f8..ebc274a9e0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -16,21 +16,16 @@ package org.thingsboard.rule.engine.action; import com.datastax.oss.driver.api.core.ConsistencyLevel; -import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Function; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; -import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -50,8 +45,6 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import static org.thingsboard.common.util.DonAsynchron.withCallback; @@ -82,7 +75,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private CassandraCluster cassandraCluster; private ConsistencyLevel defaultWriteLevel; private PreparedStatement saveStmt; - private ExecutorService readResultsProcessingExecutor; private Map fieldsMap; @Override @@ -95,31 +87,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { if (!isTableExists()) { throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster."); } - startExecutor(); saveStmt = getSaveStmt(); } @Override public void onMsg(TbContext ctx, TbMsg msg) { - withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); + withCallback(save(msg, ctx), success -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); } @Override public void destroy() { - stopExecutor(); saveStmt = null; } - private void startExecutor() { - readResultsProcessingExecutor = Executors.newCachedThreadPool(); - } - - private void stopExecutor() { - if (readResultsProcessingExecutor != null) { - readResultsProcessingExecutor.shutdownNow(); - } - } - private boolean isTableExists() { var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()); return keyspaceMdOpt.map(keyspaceMetadata -> @@ -180,7 +160,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { return query.toString(); } - private ListenableFuture save(TbMsg msg, TbContext ctx) { + private TbResultSetFuture save(TbMsg msg, TbContext ctx) { JsonElement data = JsonParser.parseString(msg.getData()); if (!data.isJsonObject()) { throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data); @@ -221,7 +201,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { if (config.getDefaultTtl() > 0) { stmtBuilder.setInt(i.get(), config.getDefaultTtl()); } - return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); + return executeAsyncWrite(ctx, stmtBuilder.build()); } } @@ -251,16 +231,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } - private ListenableFuture getFuture(TbResultSetFuture future, java.util.function.Function transformer) { - return Futures.transform(future, new Function() { - @Nullable - @Override - public T apply(@Nullable AsyncResultSet input) { - return transformer.apply(input); - } - }, readResultsProcessingExecutor); - } - @Override public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { boolean hasChanges = false; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 48e0f6d679..7b71c8f705 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -24,12 +24,10 @@ import java.util.Map; @Data public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfiguration { - private String tableName; private Map fieldsMapping; private int defaultTtl; - @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); @@ -40,4 +38,5 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig configuration.setFieldsMapping(map); return configuration; } + }