diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 1069f9d6e6..36767ca2d8 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -173,6 +173,8 @@ cassandra: callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" + # set all data types values except target to null for the same ts on save + set_null_values_enabled: "${CASSANDRA_QUERY_SET_NULL_VALUES_ENABLED:false}" tenant_rate_limits: enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" configuration: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_CONFIGURATION:1000:1,30000:60}" diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index b46c255e08..572c5d28d8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -94,6 +94,9 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Value("${cassandra.query.ts_key_value_ttl}") private long systemTtl; + @Value("${cassandra.query.set_null_values_enabled}") + private boolean setNullValuesEnabled; + private TsPartitionDate tsFormat; private PreparedStatement partitionInsertStmt; @@ -307,9 +310,13 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { + List> futures = new ArrayList<>(); ttl = computeTtl(ttl); long partition = toPartitionTs(tsKvEntry.getTs()); DataType type = tsKvEntry.getDataType(); + if (setNullValuesEnabled) { + processSetNullValues(tenantId, entityId, tsKvEntry, ttl, futures, partition, type); + } BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); stmt.setString(0, entityId.getEntityType().name()) .setUUID(1, entityId.getId()) @@ -320,6 +327,46 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem if (ttl > 0) { stmt.setInt(6, (int) ttl); } + futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null)); + return Futures.transform(Futures.allAsList(futures), result -> null); + } + + private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List> futures, long partition, DataType type) { + switch (type) { + case LONG: + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); + break; + case BOOLEAN: + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); + break; + case DOUBLE: + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.STRING)); + break; + case STRING: + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.BOOLEAN)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.DOUBLE)); + futures.add(saveNull(tenantId, entityId, tsKvEntry, ttl, partition, DataType.LONG)); + break; + } + } + + private ListenableFuture saveNull(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, long partition, DataType type) { + BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind(); + stmt.setString(0, entityId.getEntityType().name()) + .setUUID(1, entityId.getId()) + .setString(2, tsKvEntry.getKey()) + .setLong(3, partition) + .setLong(4, tsKvEntry.getTs()); + stmt.setToNull(getColumnName(type)); + if (ttl > 0) { + stmt.setInt(6, (int) ttl); + } return getFuture(executeAsyncWrite(tenantId, stmt), rs -> null); } diff --git a/dao/src/test/resources/cassandra-test.properties b/dao/src/test/resources/cassandra-test.properties index 182a10b07d..151688c41c 100644 --- a/dao/src/test/resources/cassandra-test.properties +++ b/dao/src/test/resources/cassandra-test.properties @@ -53,6 +53,7 @@ cassandra.query.buffer_size=100000 cassandra.query.concurrent_limit=1000 cassandra.query.permit_max_wait_time=20000 cassandra.query.rate_limit_print_interval_ms=30000 +cassandra.query.set_null_values_enabled=false cassandra.query.tenant_rate_limits.enabled=false cassandra.query.tenant_rate_limits.configuration=5000:1,100000:60 cassandra.query.tenant_rate_limits.print_tenant_names=false