diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index e1aa1f6a13..fcaf2420e9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -21,6 +21,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; @@ -88,7 +90,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private PreparedStatement saveStmt; private ExecutorService readResultsProcessingExecutor; private Map fieldsMap; - private long ttl; + private Integer ttl; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -97,6 +99,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { if (cassandraCluster == null) { throw new TbNodeException("Unable to connect to Cassandra database", true); } + if (!isTableExists()) { + throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.", true); + } ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); startExecutor(); @@ -106,8 +111,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private void onTenantProfileUpdate(TenantProfile tenantProfile) { DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); ttl = config.getDefaultTTL(); - if (ttl == 0L) { - ttl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); + if (ttl != null && ttl == 0) { + ttl = (int) TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } } @@ -132,6 +137,15 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } + private boolean isTableExists() { + KeyspaceMetadata keyspaceMetadata = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()).orElse(null); + if (keyspaceMetadata != null) { + TableMetadata tableMetadata = keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).orElse(null); + return tableMetadata != null; + } + return false; + } + private PreparedStatement prepare(String query) { return getSession().prepare(query); } @@ -180,7 +194,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } - if (ttl > 0) { + if (ttl != null && ttl > 0) { query.append(" USING TTL ?"); } return query.toString(); @@ -224,8 +238,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } i.getAndIncrement(); }); - if (ttl > 0) { - stmtBuilder.setInt(i.get(), (int) ttl); + if (ttl != null && ttl > 0) { + stmtBuilder.setInt(i.get(), ttl); } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } @@ -274,7 +288,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { case 0: if (!oldConfiguration.has("defaultTTL")) { hasChanges = true; - ((ObjectNode) oldConfiguration).put("defaultTTL", 0); + ((ObjectNode) oldConfiguration).putNull("defaultTTL"); } break; default: diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 8d3d78e8b9..3efae275da 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,13 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; - private long defaultTTL; + private Integer defaultTTL; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); - configuration.setDefaultTTL(0L); + configuration.setDefaultTTL(0); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index b3f3f2caa4..d691f1eaa4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -21,7 +21,10 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; import com.google.common.util.concurrent.SettableFuture; import org.junit.jupiter.api.AfterEach; @@ -61,6 +64,7 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -114,6 +118,12 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad private ProtocolVersion protocolVersionMock; @Mock private Node nodeMock; + @Mock + private Metadata metadataMock; + @Mock + private KeyspaceMetadata keyspaceMetadataMock; + @Mock + private TableMetadata tableMetadataMock; @BeforeEach public void setUp() { @@ -130,7 +140,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad public void verifyDefaultConfig() { assertThat(config.getTableName()).isEqualTo(""); assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); - assertThat(config.getDefaultTTL()).isEqualTo(0L); + assertThat(config.getDefaultTTL()).isEqualTo(0); } @Test @@ -143,12 +153,28 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad .isEqualTo(true); } + @Test + public void givenTableDoesNotExist_whenInit_thenThrowsException() { + config.setTableName("test_table"); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + mockCassandraCluster(); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty()); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + @Test public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { + config.setTableName("test_table"); config.setFieldsMapping(emptyMap()); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + mockCassandraCluster(); given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); assertThatThrownBy(() -> node.init(ctxMock, configuration)) @@ -215,7 +241,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad @ParameterizedTest @MethodSource - public void givenTtl_whenOnMsg_thenVerifyStatement(long ttlFromConfig, + public void givenTtl_whenOnMsg_thenVerifyStatement(Integer ttlFromConfig, int ttlFromTenantProfileInDays, String expectedQuery, Consumer verifyBuilder) throws TbNodeException { @@ -251,13 +277,17 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", (Consumer) builder -> { then(builder).should().setInt(1, 20); + }), + Arguments.of(null, 2, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", + (Consumer) builder -> { + then(builder).should(never()).setInt(anyInt(), anyInt()); }) ); } @Test public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { - config.setDefaultTTL(25L); + config.setDefaultTTL(25); config.setTableName("readings"); Map mappings = Map.of( "$entityId", "entityIdTableColumn", @@ -311,19 +341,33 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Arguments.of(0, "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", true, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":null}" + ), + // default config for version 1 with upgrade from version 1 + Arguments.of(1, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}", + false, "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}" ) ); } private void mockOnInit() { - given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + mockCassandraCluster(); given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); - given(cassandraClusterMock.getSession()).willReturn(sessionMock); given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE); given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock); } + private void mockCassandraCluster() { + given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + given(cassandraClusterMock.getSession()).willReturn(sessionMock); + given(sessionMock.getMetadata()).willReturn(metadataMock); + given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace"); + given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock)); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock)); + } + private void mockBoundStatement() { given(preparedStatementMock.bind()).willReturn(boundStatementMock); given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock);