fixed rewriting ttl for existing nodes

This commit is contained in:
IrynaMatveieva 2024-07-01 10:22:40 +03:00
parent 88a768f067
commit 4307d6f13a
3 changed files with 73 additions and 15 deletions

View File

@ -21,6 +21,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function; import com.google.common.base.Function;
@ -88,7 +90,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
private PreparedStatement saveStmt; private PreparedStatement saveStmt;
private ExecutorService readResultsProcessingExecutor; private ExecutorService readResultsProcessingExecutor;
private Map<String, String> fieldsMap; private Map<String, String> fieldsMap;
private long ttl; private Integer ttl;
@Override @Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
@ -97,6 +99,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
if (cassandraCluster == null) { if (cassandraCluster == null) {
throw new TbNodeException("Unable to connect to Cassandra database", true); throw new TbNodeException("Unable to connect to Cassandra database", true);
} }
if (!isTableExists()) {
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.", true);
}
ctx.addTenantProfileListener(this::onTenantProfileUpdate); ctx.addTenantProfileListener(this::onTenantProfileUpdate);
onTenantProfileUpdate(ctx.getTenantProfile()); onTenantProfileUpdate(ctx.getTenantProfile());
startExecutor(); startExecutor();
@ -106,8 +111,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
private void onTenantProfileUpdate(TenantProfile tenantProfile) { private void onTenantProfileUpdate(TenantProfile tenantProfile) {
DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
ttl = config.getDefaultTTL(); ttl = config.getDefaultTTL();
if (ttl == 0L) { if (ttl != null && ttl == 0) {
ttl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); ttl = (int) TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
} }
} }
@ -132,6 +137,15 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
} }
} }
private boolean isTableExists() {
KeyspaceMetadata keyspaceMetadata = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()).orElse(null);
if (keyspaceMetadata != null) {
TableMetadata tableMetadata = keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).orElse(null);
return tableMetadata != null;
}
return false;
}
private PreparedStatement prepare(String query) { private PreparedStatement prepare(String query) {
return getSession().prepare(query); return getSession().prepare(query);
} }
@ -180,7 +194,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
query.append("?, "); query.append("?, ");
} }
} }
if (ttl > 0) { if (ttl != null && ttl > 0) {
query.append(" USING TTL ?"); query.append(" USING TTL ?");
} }
return query.toString(); return query.toString();
@ -224,8 +238,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
} }
i.getAndIncrement(); i.getAndIncrement();
}); });
if (ttl > 0) { if (ttl != null && ttl > 0) {
stmtBuilder.setInt(i.get(), (int) ttl); stmtBuilder.setInt(i.get(), ttl);
} }
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
} }
@ -274,7 +288,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
case 0: case 0:
if (!oldConfiguration.has("defaultTTL")) { if (!oldConfiguration.has("defaultTTL")) {
hasChanges = true; hasChanges = true;
((ObjectNode) oldConfiguration).put("defaultTTL", 0); ((ObjectNode) oldConfiguration).putNull("defaultTTL");
} }
break; break;
default: default:

View File

@ -27,13 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig
private String tableName; private String tableName;
private Map<String, String> fieldsMapping; private Map<String, String> fieldsMapping;
private long defaultTTL; private Integer defaultTTL;
@Override @Override
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
configuration.setDefaultTTL(0L); configuration.setDefaultTTL(0);
configuration.setTableName(""); configuration.setTableName("");
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
map.put("", ""); map.put("", "");

View File

@ -21,7 +21,10 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -61,6 +64,7 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -114,6 +118,12 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
private ProtocolVersion protocolVersionMock; private ProtocolVersion protocolVersionMock;
@Mock @Mock
private Node nodeMock; private Node nodeMock;
@Mock
private Metadata metadataMock;
@Mock
private KeyspaceMetadata keyspaceMetadataMock;
@Mock
private TableMetadata tableMetadataMock;
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
@ -130,7 +140,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
public void verifyDefaultConfig() { public void verifyDefaultConfig() {
assertThat(config.getTableName()).isEqualTo(""); assertThat(config.getTableName()).isEqualTo("");
assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", ""));
assertThat(config.getDefaultTTL()).isEqualTo(0L); assertThat(config.getDefaultTTL()).isEqualTo(0);
} }
@Test @Test
@ -143,12 +153,28 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
.isEqualTo(true); .isEqualTo(true);
} }
@Test
public void givenTableDoesNotExist_whenInit_thenThrowsException() {
config.setTableName("test_table");
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
mockCassandraCluster();
given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty());
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(true);
}
@Test @Test
public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() {
config.setTableName("test_table");
config.setFieldsMapping(emptyMap()); config.setFieldsMapping(emptyMap());
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); mockCassandraCluster();
given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5));
assertThatThrownBy(() -> node.init(ctxMock, configuration)) assertThatThrownBy(() -> node.init(ctxMock, configuration))
@ -215,7 +241,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
public void givenTtl_whenOnMsg_thenVerifyStatement(long ttlFromConfig, public void givenTtl_whenOnMsg_thenVerifyStatement(Integer ttlFromConfig,
int ttlFromTenantProfileInDays, int ttlFromTenantProfileInDays,
String expectedQuery, String expectedQuery,
Consumer<BoundStatementBuilder> verifyBuilder) throws TbNodeException { Consumer<BoundStatementBuilder> verifyBuilder) throws TbNodeException {
@ -251,13 +277,17 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?",
(Consumer<BoundStatementBuilder>) builder -> { (Consumer<BoundStatementBuilder>) builder -> {
then(builder).should().setInt(1, 20); then(builder).should().setInt(1, 20);
}),
Arguments.of(null, 2, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)",
(Consumer<BoundStatementBuilder>) builder -> {
then(builder).should(never()).setInt(anyInt(), anyInt());
}) })
); );
} }
@Test @Test
public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException {
config.setDefaultTTL(25L); config.setDefaultTTL(25);
config.setTableName("readings"); config.setTableName("readings");
Map<String, String> mappings = Map.of( Map<String, String> mappings = Map.of(
"$entityId", "entityIdTableColumn", "$entityId", "entityIdTableColumn",
@ -311,19 +341,33 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
Arguments.of(0, Arguments.of(0,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}",
true, true,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":null}"
),
// default config for version 1 with upgrade from version 1
Arguments.of(1,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}",
false,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}" "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}"
) )
); );
} }
private void mockOnInit() { private void mockOnInit() {
given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); mockCassandraCluster();
given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5));
given(cassandraClusterMock.getSession()).willReturn(sessionMock);
given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE); given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE);
given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock); given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock);
} }
private void mockCassandraCluster() {
given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock);
given(cassandraClusterMock.getSession()).willReturn(sessionMock);
given(sessionMock.getMetadata()).willReturn(metadataMock);
given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace");
given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock));
given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock));
}
private void mockBoundStatement() { private void mockBoundStatement() {
given(preparedStatementMock.bind()).willReturn(boundStatementMock); given(preparedStatementMock.bind()).willReturn(boundStatementMock);
given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock); given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock);