removed using TTL from tenant profile
This commit is contained in:
parent
31c8707658
commit
65f94878d2
@ -38,10 +38,8 @@ import org.thingsboard.rule.engine.api.TbNode;
|
|||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
|
||||||
import org.thingsboard.server.common.data.util.TbPair;
|
import org.thingsboard.server.common.data.util.TbPair;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
||||||
@ -54,7 +52,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
import static org.thingsboard.common.util.DonAsynchron.withCallback;
|
||||||
@ -88,7 +85,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
|||||||
private PreparedStatement saveStmt;
|
private PreparedStatement saveStmt;
|
||||||
private ExecutorService readResultsProcessingExecutor;
|
private ExecutorService readResultsProcessingExecutor;
|
||||||
private Map<String, String> fieldsMap;
|
private Map<String, String> fieldsMap;
|
||||||
private Integer ttl;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
@ -100,20 +96,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
|||||||
if (!isTableExists()) {
|
if (!isTableExists()) {
|
||||||
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.");
|
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.");
|
||||||
}
|
}
|
||||||
ctx.addTenantProfileListener(this::onTenantProfileUpdate);
|
|
||||||
onTenantProfileUpdate(ctx.getTenantProfile());
|
|
||||||
startExecutor();
|
startExecutor();
|
||||||
saveStmt = getSaveStmt();
|
saveStmt = getSaveStmt();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onTenantProfileUpdate(TenantProfile tenantProfile) {
|
|
||||||
DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
|
||||||
ttl = config.getDefaultTTL();
|
|
||||||
if (ttl != null && ttl == 0) {
|
|
||||||
ttl = (int) TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||||
withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
|
withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor());
|
||||||
@ -189,7 +175,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
|||||||
query.append("?, ");
|
query.append("?, ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ttl != null && ttl > 0) {
|
if (config.getDefaultTtL() > 0) {
|
||||||
query.append(" USING TTL ?");
|
query.append(" USING TTL ?");
|
||||||
}
|
}
|
||||||
return query.toString();
|
return query.toString();
|
||||||
@ -233,8 +219,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
|||||||
}
|
}
|
||||||
i.getAndIncrement();
|
i.getAndIncrement();
|
||||||
});
|
});
|
||||||
if (ttl != null && ttl > 0) {
|
if (config.getDefaultTtL() > 0) {
|
||||||
stmtBuilder.setInt(i.get(), ttl);
|
stmtBuilder.setInt(i.get(), config.getDefaultTtL());
|
||||||
}
|
}
|
||||||
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
|
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
|
||||||
}
|
}
|
||||||
@ -281,9 +267,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
|
|||||||
boolean hasChanges = false;
|
boolean hasChanges = false;
|
||||||
switch (fromVersion) {
|
switch (fromVersion) {
|
||||||
case 0:
|
case 0:
|
||||||
if (!oldConfiguration.has("defaultTTL")) {
|
if (!oldConfiguration.has("defaultTtL")) {
|
||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
((ObjectNode) oldConfiguration).putNull("defaultTTL");
|
((ObjectNode) oldConfiguration).put("defaultTtL", 0);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|||||||
@ -27,13 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig
|
|||||||
|
|
||||||
private String tableName;
|
private String tableName;
|
||||||
private Map<String, String> fieldsMapping;
|
private Map<String, String> fieldsMapping;
|
||||||
private Integer defaultTTL;
|
private int defaultTtL;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
|
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
|
||||||
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
|
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
|
||||||
configuration.setDefaultTTL(0);
|
configuration.setDefaultTtL(0);
|
||||||
configuration.setTableName("");
|
configuration.setTableName("");
|
||||||
Map<String, String> map = new HashMap<>();
|
Map<String, String> map = new HashMap<>();
|
||||||
map.put("", "");
|
map.put("", "");
|
||||||
|
|||||||
@ -75,7 +75,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
onTenantProfileUpdate(ctx.getTenantProfile());
|
onTenantProfileUpdate(ctx.getTenantProfile());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onTenantProfileUpdate(TenantProfile tenantProfile) {
|
void onTenantProfileUpdate(TenantProfile tenantProfile) {
|
||||||
DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
||||||
tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
|
tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,13 +46,9 @@ import org.thingsboard.rule.engine.api.TbContext;
|
|||||||
import org.thingsboard.rule.engine.api.TbNode;
|
import org.thingsboard.rule.engine.api.TbNode;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.id.TenantProfileId;
|
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
|
||||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
||||||
@ -91,7 +87,6 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
|
|
||||||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7"));
|
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7"));
|
||||||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e"));
|
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e"));
|
||||||
private final TenantProfileId TENANT_PROFILE_ID = new TenantProfileId(UUID.fromString("239af5ad-53c1-4179-802f-3821f92ed338"));
|
|
||||||
|
|
||||||
private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor();
|
private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor();
|
||||||
|
|
||||||
@ -140,7 +135,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
public void verifyDefaultConfig() {
|
public void verifyDefaultConfig() {
|
||||||
assertThat(config.getTableName()).isEqualTo("");
|
assertThat(config.getTableName()).isEqualTo("");
|
||||||
assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", ""));
|
assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", ""));
|
||||||
assertThat(config.getDefaultTTL()).isEqualTo(0);
|
assertThat(config.getDefaultTtL()).isEqualTo(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -175,7 +170,6 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
||||||
|
|
||||||
mockCassandraCluster();
|
mockCassandraCluster();
|
||||||
given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5));
|
|
||||||
|
|
||||||
assertThatThrownBy(() -> node.init(ctxMock, configuration))
|
assertThatThrownBy(() -> node.init(ctxMock, configuration))
|
||||||
.isInstanceOf(TbNodeException.class)
|
.isInstanceOf(TbNodeException.class)
|
||||||
@ -241,16 +235,14 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource
|
@MethodSource
|
||||||
public void givenTtl_whenOnMsg_thenVerifyStatement(Integer ttlFromConfig,
|
public void givenTtl_whenOnMsg_thenVerifyStatement(int ttlFromConfig,
|
||||||
int ttlFromTenantProfileInDays,
|
|
||||||
String expectedQuery,
|
String expectedQuery,
|
||||||
Consumer<BoundStatementBuilder> verifyBuilder) throws TbNodeException {
|
Consumer<BoundStatementBuilder> verifyBuilder) throws TbNodeException {
|
||||||
config.setTableName("readings");
|
config.setTableName("readings");
|
||||||
config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn"));
|
config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn"));
|
||||||
config.setDefaultTTL(ttlFromConfig);
|
config.setDefaultTtL(ttlFromConfig);
|
||||||
|
|
||||||
mockOnInit();
|
mockOnInit();
|
||||||
given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(ttlFromTenantProfileInDays));
|
|
||||||
willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder();
|
willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder();
|
||||||
given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock);
|
given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock);
|
||||||
given(boundStatementBuilderMock.build()).willReturn(boundStatementMock);
|
given(boundStatementBuilderMock.build()).willReturn(boundStatementMock);
|
||||||
@ -266,28 +258,20 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
|
|
||||||
private static Stream<Arguments> givenTtl_whenOnMsg_thenVerifyStatement() {
|
private static Stream<Arguments> givenTtl_whenOnMsg_thenVerifyStatement() {
|
||||||
return Stream.of(
|
return Stream.of(
|
||||||
Arguments.of(0, 0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)",
|
Arguments.of(0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)",
|
||||||
(Consumer<BoundStatementBuilder>) builder -> {
|
(Consumer<BoundStatementBuilder>) builder -> {
|
||||||
then(builder).should(never()).setInt(anyInt(), anyInt());
|
then(builder).should(never()).setInt(anyInt(), anyInt());
|
||||||
}),
|
}),
|
||||||
Arguments.of(0, 5, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?",
|
Arguments.of(20, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?",
|
||||||
(Consumer<BoundStatementBuilder>) builder -> {
|
|
||||||
then(builder).should().setInt(1, 432000);
|
|
||||||
}),
|
|
||||||
Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?",
|
|
||||||
(Consumer<BoundStatementBuilder>) builder -> {
|
(Consumer<BoundStatementBuilder>) builder -> {
|
||||||
then(builder).should().setInt(1, 20);
|
then(builder).should().setInt(1, 20);
|
||||||
}),
|
|
||||||
Arguments.of(null, 2, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)",
|
|
||||||
(Consumer<BoundStatementBuilder>) builder -> {
|
|
||||||
then(builder).should(never()).setInt(anyInt(), anyInt());
|
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException {
|
public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException {
|
||||||
config.setDefaultTTL(25);
|
config.setDefaultTtL(25);
|
||||||
config.setTableName("readings");
|
config.setTableName("readings");
|
||||||
Map<String, String> mappings = Map.of(
|
Map<String, String> mappings = Map.of(
|
||||||
"$entityId", "entityIdTableColumn",
|
"$entityId", "entityIdTableColumn",
|
||||||
@ -341,20 +325,19 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
Arguments.of(0,
|
Arguments.of(0,
|
||||||
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}",
|
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}",
|
||||||
true,
|
true,
|
||||||
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":null}"
|
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}"
|
||||||
),
|
),
|
||||||
// default config for version 1 with upgrade from version 1
|
// default config for version 1 with upgrade from version 1
|
||||||
Arguments.of(1,
|
Arguments.of(1,
|
||||||
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}",
|
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}",
|
||||||
false,
|
false,
|
||||||
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}"
|
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}"
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mockOnInit() {
|
private void mockOnInit() {
|
||||||
mockCassandraCluster();
|
mockCassandraCluster();
|
||||||
given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5));
|
|
||||||
given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE);
|
given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE);
|
||||||
given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock);
|
given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock);
|
||||||
}
|
}
|
||||||
@ -410,14 +393,4 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad
|
|||||||
then(boundStatementBuilderMock).should().setInt(values.size(), 25);
|
then(boundStatementBuilderMock).should().setInt(values.size(), 25);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TenantProfile getTenantProfileWithTtl(int ttlInDays) {
|
|
||||||
var tenantProfile = new TenantProfile(TENANT_PROFILE_ID);
|
|
||||||
var tenantProfileData = new TenantProfileData();
|
|
||||||
var tenantProfileConfiguration = new DefaultTenantProfileConfiguration();
|
|
||||||
tenantProfileConfiguration.setDefaultStorageTtlDays(ttlInDays);
|
|
||||||
tenantProfileData.setConfiguration(tenantProfileConfiguration);
|
|
||||||
tenantProfile.setProfileData(tenantProfileData);
|
|
||||||
return tenantProfile;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user