Merge branch 'master' of github.com:thingsboard/thingsboard into feature/2fa-enforce
This commit is contained in:
commit
b71baed1d1
@ -37,7 +37,7 @@ public class UserFields extends AbstractEntityFields {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return super.getEmail();
|
||||
return getEmail();
|
||||
}
|
||||
|
||||
public UserFields(UUID id, long createdTime, UUID tenantId, UUID customerId,
|
||||
|
||||
@ -16,21 +16,16 @@
|
||||
package org.thingsboard.rule.engine.action;
|
||||
|
||||
import com.datastax.oss.driver.api.core.ConsistencyLevel;
|
||||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
|
||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import com.google.gson.JsonPrimitive;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
@ -50,8 +45,6 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
||||
@ -82,7 +75,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
||||
private CassandraCluster cassandraCluster;
|
||||
private ConsistencyLevel defaultWriteLevel;
|
||||
private PreparedStatement saveStmt;
|
||||
private ExecutorService readResultsProcessingExecutor;
|
||||
private Map<String, String> fieldsMap;
|
||||
|
||||
@Override
|
||||
@ -95,31 +87,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
||||
if (!isTableExists()) {
|
||||
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.");
|
||||
}
|
||||
startExecutor();
|
||||
saveStmt = getSaveStmt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
|
||||
withCallback(save(msg, ctx), success -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
stopExecutor();
|
||||
saveStmt = null;
|
||||
}
|
||||
|
||||
private void startExecutor() {
|
||||
readResultsProcessingExecutor = Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
private void stopExecutor() {
|
||||
if (readResultsProcessingExecutor != null) {
|
||||
readResultsProcessingExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isTableExists() {
|
||||
var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName());
|
||||
return keyspaceMdOpt.map(keyspaceMetadata ->
|
||||
@ -180,7 +160,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
||||
return query.toString();
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> save(TbMsg msg, TbContext ctx) {
|
||||
private TbResultSetFuture save(TbMsg msg, TbContext ctx) {
|
||||
JsonElement data = JsonParser.parseString(msg.getData());
|
||||
if (!data.isJsonObject()) {
|
||||
throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data);
|
||||
@ -221,7 +201,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
||||
if (config.getDefaultTtl() > 0) {
|
||||
stmtBuilder.setInt(i.get(), config.getDefaultTtl());
|
||||
}
|
||||
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
|
||||
return executeAsyncWrite(ctx, stmtBuilder.build());
|
||||
}
|
||||
}
|
||||
|
||||
@ -251,16 +231,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
||||
}
|
||||
}
|
||||
|
||||
private <T> ListenableFuture<T> getFuture(TbResultSetFuture future, java.util.function.Function<AsyncResultSet, T> transformer) {
|
||||
return Futures.transform(future, new Function<AsyncResultSet, T>() {
|
||||
@Nullable
|
||||
@Override
|
||||
public T apply(@Nullable AsyncResultSet input) {
|
||||
return transformer.apply(input);
|
||||
}
|
||||
}, readResultsProcessingExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
||||
boolean hasChanges = false;
|
||||
|
||||
@ -24,12 +24,10 @@ import java.util.Map;
|
||||
@Data
|
||||
public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfiguration<TbSaveToCustomCassandraTableNodeConfiguration> {
|
||||
|
||||
|
||||
private String tableName;
|
||||
private Map<String, String> fieldsMapping;
|
||||
private int defaultTtl;
|
||||
|
||||
|
||||
@Override
|
||||
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
|
||||
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
|
||||
@ -40,4 +38,5 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig
|
||||
configuration.setFieldsMapping(map);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user