From ada9f8b7ebd422a6918b5abcc7d11659bcdce046 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 16 Apr 2024 15:40:34 +0300 Subject: [PATCH 01/21] added tests for save to custom table node --- .../TbSaveToCustomCassandraTableNode.java | 8 +- .../TbSaveToCustomCassandraTableNodeTest.java | 248 ++++++++++++++++++ 2 files changed, 254 insertions(+), 2 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index 4b957db9cd..1e55e9d320 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -173,7 +173,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data); } else { JsonObject dataAsObject = data.getAsJsonObject(); - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(saveStmt.bind()); + BoundStatementBuilder stmtBuilder = getStmtBuilder(); AtomicInteger i = new AtomicInteger(0); fieldsMap.forEach((key, value) -> { if (key.equals(ENTITY_ID)) { @@ -198,7 +198,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } else if (dataKeyElement.isJsonObject()) { stmtBuilder.setString(i.get(), dataKeyElement.getAsJsonObject().toString()); } else { - throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Object or JSON Primitive!"); + throw new IllegalStateException("Message data key: '" + key + "' with value: '" + dataKeyElement + "' is not a JSON Object or JSON Primitive!"); } } else { throw new RuntimeException("Message data doesn't contain key: " + "'" + key + "'!"); @@ -209,6 +209,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } + BoundStatementBuilder getStmtBuilder() { + return new BoundStatementBuilder(saveStmt.bind()); + } + private TbResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) { return executeAsync(ctx, statement, defaultWriteLevel); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java new file mode 100644 index 0000000000..699af15fa3 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -0,0 +1,248 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; + +import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.cassandra.guava.GuavaSession; +import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.dao.nosql.TbResultSet; +import org.thingsboard.server.dao.nosql.TbResultSetFuture; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbSaveToCustomCassandraTableNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e")); + + private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor(); + + private TbSaveToCustomCassandraTableNode node; + private TbSaveToCustomCassandraTableNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private CassandraCluster cassandraClusterMock; + @Mock + private GuavaSession sessionMock; + @Mock + private PreparedStatement preparedStatementMock; + @Mock + private BoundStatement boundStatementMock; + @Mock + private BoundStatementBuilder boundStatementBuilderMock; + @Mock + private ColumnDefinitions columnDefinitionsMock; + @Mock + private CodecRegistry codecRegistryMock; + @Mock + private ProtocolVersion protocolVersionMock; + @Mock + private Node nodeMock; + + @Test + void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Unable to connect to Cassandra database"); + } + + @Test + void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setFieldsMapping(emptyMap()); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + when(ctxMock.getCassandraCluster()).thenReturn(cassandraClusterMock); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Fields(key,value) map is empty!"); + } + + @Test + void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + + node.init(ctxMock, configuration); + + String data = ""; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid message structure, it is not a JSON Object:" + null); + } + + @Test + void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + when(preparedStatementMock.bind()).thenReturn(boundStatementMock); + when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); + when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); + when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); + when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); + when(boundStatementMock.getNode()).thenReturn(nodeMock); + + node.init(ctxMock, configuration); + + String data = """ + { + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);; + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data doesn't contain key: 'temp'!"); + } + + @Test + void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + when(preparedStatementMock.bind()).thenReturn(boundStatementMock); + when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); + when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); + when(boundStatementMock.getValues()).thenReturn(List.of(ByteBuffer.allocate(1))); + when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); + when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); + when(boundStatementMock.getNode()).thenReturn(nodeMock); + + node.init(ctxMock, configuration); + + String data = """ + { + "temp": [value] + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!"); + } + + @Test + void givenValidMsgStructure_whenOnMsg_thenOk() throws Exception { + node = spy(new TbSaveToCustomCassandraTableNode()); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + doAnswer(invocation -> boundStatementBuilderMock).when(node).getStmtBuilder(); + when(boundStatementBuilderMock.setLong(anyInt(), anyLong())).thenReturn(boundStatementBuilderMock); + when(boundStatementBuilderMock.build()).thenReturn(boundStatementMock); + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + doAnswer(invocation -> { + SettableFuture mainFuture = SettableFuture.create(); + mainFuture.set(new TbResultSet(null, null, null)); + TbResultSetFuture value = new TbResultSetFuture(mainFuture); + return value; + }).when(ctxMock).submitCassandraWriteTask(any()); + when(ctxMock.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + + node.init(ctxMock, configuration); + + String data = """ + { + "temp": 31, + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + node.onMsg(ctxMock, msg); + + verify(sessionMock).prepare(eq("INSERT INTO cs_tb_temperature_sensor(temperature) VALUES(?)")); + verify(ctxMock).submitCassandraWriteTask(any(CassandraStatementTask.class)); + verify(ctxMock).tellSuccess(eq(msg)); + } + + private void initializeMocks() { + when(ctxMock.getCassandraCluster()).thenReturn(cassandraClusterMock); + when(cassandraClusterMock.getSession()).thenReturn(sessionMock); + when(cassandraClusterMock.getDefaultWriteConsistencyLevel()).thenReturn(DefaultConsistencyLevel.ONE); + when(sessionMock.prepare(anyString())).thenReturn(preparedStatementMock); + } +} From 00251f11b1dd871981f01b101d7cec34920d7cfe Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 29 May 2024 11:19:15 +0300 Subject: [PATCH 02/21] refactored tests --- .../TbSaveToCustomCassandraTableNodeTest.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 699af15fa3..d760568bd4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -25,6 +25,7 @@ import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -95,10 +96,14 @@ public class TbSaveToCustomCassandraTableNodeTest { @Mock private Node nodeMock; + @BeforeEach + void setUp() { + node = spy(new TbSaveToCustomCassandraTableNode()); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + } + @Test void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { - node = new TbSaveToCustomCassandraTableNode(); - config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); assertThatThrownBy(() -> node.init(ctxMock, configuration)) .isInstanceOf(RuntimeException.class) @@ -107,8 +112,6 @@ public class TbSaveToCustomCassandraTableNodeTest { @Test void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { - node = new TbSaveToCustomCassandraTableNode(); - config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); config.setFieldsMapping(emptyMap()); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -121,8 +124,6 @@ public class TbSaveToCustomCassandraTableNodeTest { @Test void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException { - node = new TbSaveToCustomCassandraTableNode(); - config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -131,8 +132,7 @@ public class TbSaveToCustomCassandraTableNodeTest { node.init(ctxMock, configuration); - String data = ""; - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) .isInstanceOf(IllegalStateException.class) @@ -141,8 +141,6 @@ public class TbSaveToCustomCassandraTableNodeTest { @Test void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException { - node = new TbSaveToCustomCassandraTableNode(); - config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -171,8 +169,6 @@ public class TbSaveToCustomCassandraTableNodeTest { @Test void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException { - node = new TbSaveToCustomCassandraTableNode(); - config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -203,8 +199,6 @@ public class TbSaveToCustomCassandraTableNodeTest { @Test void givenValidMsgStructure_whenOnMsg_thenOk() throws Exception { - node = spy(new TbSaveToCustomCassandraTableNode()); - config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); From 6a9a933d4869475556d67d3fd3cad1526692bbeb Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 6 Jun 2024 16:18:43 +0300 Subject: [PATCH 03/21] added tests for check builder --- .../TbSaveToCustomCassandraTableNodeTest.java | 193 ++++++++++++------ 1 file changed, 134 insertions(+), 59 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index d760568bd4..9d0c646828 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -25,9 +25,14 @@ import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; @@ -47,14 +52,19 @@ import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; -import java.nio.ByteBuffer; -import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Stream; import static java.util.Collections.emptyMap; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -102,6 +112,11 @@ public class TbSaveToCustomCassandraTableNodeTest { config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); } + @AfterEach + void tearDown() { + node.destroy(); + } + @Test void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); @@ -126,14 +141,12 @@ public class TbSaveToCustomCassandraTableNodeTest { void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException { config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - initializeMocks(); + mockOnInit(); - node.init(ctxMock, configuration); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); - assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) .isInstanceOf(IllegalStateException.class) .hasMessage("Invalid message structure, it is not a JSON Object:" + null); @@ -143,25 +156,18 @@ public class TbSaveToCustomCassandraTableNodeTest { void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException { config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - initializeMocks(); - when(preparedStatementMock.bind()).thenReturn(boundStatementMock); - when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); - when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); - when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); - when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); - when(boundStatementMock.getNode()).thenReturn(nodeMock); + mockOnInit(); + mockBoundStatement(); - node.init(ctxMock, configuration); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); String data = """ { "humidity": 77 } """; - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);; - + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) .isInstanceOf(RuntimeException.class) .hasMessage("Message data doesn't contain key: 'temp'!"); @@ -171,18 +177,11 @@ public class TbSaveToCustomCassandraTableNodeTest { void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException { config.setTableName("temperature_sensor"); config.setFieldsMapping(Map.of("temp", "temperature")); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - initializeMocks(); - when(preparedStatementMock.bind()).thenReturn(boundStatementMock); - when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); - when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); - when(boundStatementMock.getValues()).thenReturn(List.of(ByteBuffer.allocate(1))); - when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); - when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); - when(boundStatementMock.getNode()).thenReturn(nodeMock); + mockOnInit(); + mockBoundStatement(); - node.init(ctxMock, configuration); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); String data = """ { @@ -190,53 +189,129 @@ public class TbSaveToCustomCassandraTableNodeTest { } """; TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); - - assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) .isInstanceOf(RuntimeException.class) .hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!"); } @Test - void givenValidMsgStructure_whenOnMsg_thenOk() throws Exception { - config.setTableName("temperature_sensor"); - config.setFieldsMapping(Map.of("temp", "temperature")); - var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + void givenValidMsgStructureAndKeyIsEntityId_whenOnMsg_thenOk() throws Exception { + config.setTableName("readings"); + config.setFieldsMapping(Map.of("$entityId", "entityId")); - initializeMocks(); - doAnswer(invocation -> boundStatementBuilderMock).when(node).getStmtBuilder(); - when(boundStatementBuilderMock.setLong(anyInt(), anyLong())).thenReturn(boundStatementBuilderMock); - when(boundStatementBuilderMock.build()).thenReturn(boundStatementMock); - when(ctxMock.getTenantId()).thenReturn(TENANT_ID); - doAnswer(invocation -> { - SettableFuture mainFuture = SettableFuture.create(); - mainFuture.set(new TbResultSet(null, null, null)); - TbResultSetFuture value = new TbResultSetFuture(mainFuture); - return value; - }).when(ctxMock).submitCassandraWriteTask(any()); - when(ctxMock.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + mockOnInit(); + when(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).thenReturn(boundStatementBuilderMock); + when(boundStatementMock.getConsistencyLevel()).thenReturn(DefaultConsistencyLevel.ONE); + mockBoundStatementBuilder(); + mockSubmittingCassandraTask(); - node.init(ctxMock, configuration); - - String data = """ - { - "temp": 31, - "humidity": 77 - } - """; - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); node.onMsg(ctxMock, msg); - verify(sessionMock).prepare(eq("INSERT INTO cs_tb_temperature_sensor(temperature) VALUES(?)")); - verify(ctxMock).submitCassandraWriteTask(any(CassandraStatementTask.class)); - verify(ctxMock).tellSuccess(eq(msg)); + verify(sessionMock).prepare("INSERT INTO cs_tb_readings(entityId) VALUES(?)"); + verify(boundStatementBuilderMock).setUuid(anyInt(), eq(DEVICE_ID.getId())); + + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class); + verify(ctxMock).submitCassandraWriteTask(taskCaptor.capture()); + CassandraStatementTask task = taskCaptor.getValue(); + assertThat(task.getTenantId()).isEqualTo(TENANT_ID); + assertThat(task.getSession()).isEqualTo(sessionMock); + assertThat(task.getStatement()).isEqualTo(boundStatementMock); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(ctxMock).tellSuccess(msg) + ); } - private void initializeMocks() { + @ParameterizedTest + @MethodSource + void givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable( + String valueFromData, + Consumer mockBuilderConsumer, + Consumer verifyBuilderConsumer) throws TbNodeException { + config.setTableName("sensor_readings"); + config.setFieldsMapping(Map.of("msgField1", "tableColumn1", "msgField2", "tableColumn2")); + + mockOnInit(); + mockBuilderConsumer.accept(boundStatementBuilderMock); + mockBoundStatementBuilder(); + mockSubmittingCassandraTask(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + String data = String.format(""" + { + "msgField1": %s, + "msgField2": "value2", + "anotherField": 22 + } + """, valueFromData); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + node.onMsg(ctxMock, msg); + + verifyBuilderConsumer.accept(boundStatementBuilderMock); + + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class); + verify(ctxMock).submitCassandraWriteTask(taskCaptor.capture()); + CassandraStatementTask task = taskCaptor.getValue(); + assertThat(task.getTenantId()).isEqualTo(TENANT_ID); + assertThat(task.getSession()).isEqualTo(sessionMock); + assertThat(task.getStatement()).isEqualTo(boundStatementMock); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> verify(ctxMock).tellSuccess(msg) + ); + } + + private static Stream givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable() { + return Stream.of( + Arguments.of("1.0", + (Consumer) mockBuilder -> when(mockBuilder.setDouble(anyInt(), anyDouble())).thenReturn(mockBuilder), + (Consumer) verifyBuilder -> verify(verifyBuilder).setDouble(anyInt(), eq(1.0))), + Arguments.of("2", + (Consumer) mockBuilder -> when(mockBuilder.setLong(anyInt(), anyLong())).thenReturn(mockBuilder), + (Consumer) verifyBuilder -> verify(verifyBuilder).setLong(anyInt(), eq(2L))), + Arguments.of("true", + (Consumer) mockBuilder -> when(mockBuilder.setBoolean(anyInt(), anyBoolean())).thenReturn(mockBuilder), + (Consumer) verifyBuilder -> verify(verifyBuilder).setBoolean(anyInt(), eq(true))), + Arguments.of("\"string value\"", + (Consumer) mockBuilder -> when(mockBuilder.setString(anyInt(), anyString())).thenReturn(mockBuilder), + (Consumer) verifyBuilder -> verify(verifyBuilder).setString(anyInt(), eq("string value"))), + Arguments.of("{\"key\":\"value\"}", + (Consumer) mockBuilder -> when(mockBuilder.setString(anyInt(), anyString())).thenReturn(mockBuilder), + (Consumer) verifyBuilder -> verify(verifyBuilder).setString(anyInt(), eq("{\"key\":\"value\"}"))) + ); + } + + private void mockOnInit() { when(ctxMock.getCassandraCluster()).thenReturn(cassandraClusterMock); when(cassandraClusterMock.getSession()).thenReturn(sessionMock); when(cassandraClusterMock.getDefaultWriteConsistencyLevel()).thenReturn(DefaultConsistencyLevel.ONE); when(sessionMock.prepare(anyString())).thenReturn(preparedStatementMock); } + + private void mockBoundStatement() { + when(preparedStatementMock.bind()).thenReturn(boundStatementMock); + when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); + when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); + when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); + when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); + when(boundStatementMock.getNode()).thenReturn(nodeMock); + } + + private void mockBoundStatementBuilder() { + doAnswer(invocation -> boundStatementBuilderMock).when(node).getStmtBuilder(); + when(boundStatementBuilderMock.build()).thenReturn(boundStatementMock); + } + + private void mockSubmittingCassandraTask() { + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + doAnswer(invocation -> { + SettableFuture mainFuture = SettableFuture.create(); + mainFuture.set(new TbResultSet(null, null, null)); + return new TbResultSetFuture(mainFuture); + }).when(ctxMock).submitCassandraWriteTask(any()); + when(ctxMock.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + } + } From 3368b9480b8ea6f49e8657209039cae8ae00f586 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 6 Jun 2024 16:29:38 +0300 Subject: [PATCH 04/21] added test for default config --- .../engine/action/TbSaveToCustomCassandraTableNodeTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 9d0c646828..dab748b0a4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -117,6 +117,12 @@ public class TbSaveToCustomCassandraTableNodeTest { node.destroy(); } + @Test + void verifyDefaultConfig() { + assertThat(config.getTableName()).isEqualTo(""); + assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); + } + @Test void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); From c137a144816c47446080925750ca5edcef80d020 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 7 Jun 2024 18:03:45 +0300 Subject: [PATCH 05/21] created complex test instead of parametrized --- .../TbSaveToCustomCassandraTableNode.java | 4 +- .../TbSaveToCustomCassandraTableNodeTest.java | 114 ++++++------------ 2 files changed, 38 insertions(+), 80 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index 1e55e9d320..d262edbb44 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -88,7 +88,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { config = TbNodeUtils.convert(configuration, TbSaveToCustomCassandraTableNodeConfiguration.class); cassandraCluster = ctx.getCassandraCluster(); if (cassandraCluster == null) { - throw new RuntimeException("Unable to connect to Cassandra database"); + throw new TbNodeException("Unable to connect to Cassandra database"); } else { startExecutor(); saveStmt = getSaveStmt(); @@ -170,7 +170,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private ListenableFuture save(TbMsg msg, TbContext ctx) { JsonElement data = parser.parse(msg.getData()); if (!data.isJsonObject()) { - throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data); + throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data); } else { JsonObject dataAsObject = data.getAsJsonObject(); BoundStatementBuilder stmtBuilder = getStmtBuilder(); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index dab748b0a4..d841928041 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -23,15 +23,11 @@ import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; - import com.google.common.util.concurrent.SettableFuture; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -54,14 +50,10 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; -import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; @@ -127,7 +119,7 @@ public class TbSaveToCustomCassandraTableNodeTest { void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); assertThatThrownBy(() -> node.init(ctxMock, configuration)) - .isInstanceOf(RuntimeException.class) + .isInstanceOf(TbNodeException.class) .hasMessage("Unable to connect to Cassandra database"); } @@ -155,7 +147,7 @@ public class TbSaveToCustomCassandraTableNodeTest { TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) .isInstanceOf(IllegalStateException.class) - .hasMessage("Invalid message structure, it is not a JSON Object:" + null); + .hasMessage("Invalid message structure, it is not a JSON Object: " + null); } @Test @@ -201,92 +193,44 @@ public class TbSaveToCustomCassandraTableNodeTest { } @Test - void givenValidMsgStructureAndKeyIsEntityId_whenOnMsg_thenOk() throws Exception { + void givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable() throws TbNodeException { config.setTableName("readings"); - config.setFieldsMapping(Map.of("$entityId", "entityId")); + config.setFieldsMapping(Map.of( + "$entityId", "entityIdTableColumn", + "doubleField", "doubleTableColumn", + "longField", "longTableColumn", + "booleanField", "booleanTableColumn", + "stringField", "stringTableColumn", + "jsonField", "jsonTableColumn" + )); mockOnInit(); - when(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).thenReturn(boundStatementBuilderMock); - when(boundStatementMock.getConsistencyLevel()).thenReturn(DefaultConsistencyLevel.ONE); mockBoundStatementBuilder(); mockSubmittingCassandraTask(); node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); - node.onMsg(ctxMock, msg); - - verify(sessionMock).prepare("INSERT INTO cs_tb_readings(entityId) VALUES(?)"); - verify(boundStatementBuilderMock).setUuid(anyInt(), eq(DEVICE_ID.getId())); - - ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class); - verify(ctxMock).submitCassandraWriteTask(taskCaptor.capture()); - CassandraStatementTask task = taskCaptor.getValue(); - assertThat(task.getTenantId()).isEqualTo(TENANT_ID); - assertThat(task.getSession()).isEqualTo(sessionMock); - assertThat(task.getStatement()).isEqualTo(boundStatementMock); - await().atMost(1, TimeUnit.SECONDS).untilAsserted( - () -> verify(ctxMock).tellSuccess(msg) - ); - } - - @ParameterizedTest - @MethodSource - void givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable( - String valueFromData, - Consumer mockBuilderConsumer, - Consumer verifyBuilderConsumer) throws TbNodeException { - config.setTableName("sensor_readings"); - config.setFieldsMapping(Map.of("msgField1", "tableColumn1", "msgField2", "tableColumn2")); - - mockOnInit(); - mockBuilderConsumer.accept(boundStatementBuilderMock); - mockBoundStatementBuilder(); - mockSubmittingCassandraTask(); - - node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); - - String data = String.format(""" + String data = """ { - "msgField1": %s, - "msgField2": "value2", - "anotherField": 22 + "doubleField": 22.5, + "longField": 56, + "booleanField": true, + "stringField": "some string", + "jsonField": { + "key": "value" + } } - """, valueFromData); + """; TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); node.onMsg(ctxMock, msg); - verifyBuilderConsumer.accept(boundStatementBuilderMock); - + verifySettingStatementBuilder(); ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class); verify(ctxMock).submitCassandraWriteTask(taskCaptor.capture()); CassandraStatementTask task = taskCaptor.getValue(); assertThat(task.getTenantId()).isEqualTo(TENANT_ID); assertThat(task.getSession()).isEqualTo(sessionMock); assertThat(task.getStatement()).isEqualTo(boundStatementMock); - await().atMost(1, TimeUnit.SECONDS).untilAsserted( - () -> verify(ctxMock).tellSuccess(msg) - ); - } - - private static Stream givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable() { - return Stream.of( - Arguments.of("1.0", - (Consumer) mockBuilder -> when(mockBuilder.setDouble(anyInt(), anyDouble())).thenReturn(mockBuilder), - (Consumer) verifyBuilder -> verify(verifyBuilder).setDouble(anyInt(), eq(1.0))), - Arguments.of("2", - (Consumer) mockBuilder -> when(mockBuilder.setLong(anyInt(), anyLong())).thenReturn(mockBuilder), - (Consumer) verifyBuilder -> verify(verifyBuilder).setLong(anyInt(), eq(2L))), - Arguments.of("true", - (Consumer) mockBuilder -> when(mockBuilder.setBoolean(anyInt(), anyBoolean())).thenReturn(mockBuilder), - (Consumer) verifyBuilder -> verify(verifyBuilder).setBoolean(anyInt(), eq(true))), - Arguments.of("\"string value\"", - (Consumer) mockBuilder -> when(mockBuilder.setString(anyInt(), anyString())).thenReturn(mockBuilder), - (Consumer) verifyBuilder -> verify(verifyBuilder).setString(anyInt(), eq("string value"))), - Arguments.of("{\"key\":\"value\"}", - (Consumer) mockBuilder -> when(mockBuilder.setString(anyInt(), anyString())).thenReturn(mockBuilder), - (Consumer) verifyBuilder -> verify(verifyBuilder).setString(anyInt(), eq("{\"key\":\"value\"}"))) - ); + verify(ctxMock).tellSuccess(msg); } private void mockOnInit() { @@ -307,6 +251,11 @@ public class TbSaveToCustomCassandraTableNodeTest { private void mockBoundStatementBuilder() { doAnswer(invocation -> boundStatementBuilderMock).when(node).getStmtBuilder(); + when(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).thenReturn(boundStatementBuilderMock); + when(boundStatementBuilderMock.setDouble(anyInt(), anyDouble())).thenReturn(boundStatementBuilderMock); + when(boundStatementBuilderMock.setLong(anyInt(), anyLong())).thenReturn(boundStatementBuilderMock); + when(boundStatementBuilderMock.setBoolean(anyInt(), anyBoolean())).thenReturn(boundStatementBuilderMock); + when(boundStatementBuilderMock.setString(anyInt(), anyString())).thenReturn(boundStatementBuilderMock); when(boundStatementBuilderMock.build()).thenReturn(boundStatementMock); } @@ -320,4 +269,13 @@ public class TbSaveToCustomCassandraTableNodeTest { when(ctxMock.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); } + private void verifySettingStatementBuilder() { + verify(boundStatementBuilderMock).setUuid(anyInt(), eq(DEVICE_ID.getId())); + verify(boundStatementBuilderMock).setDouble(anyInt(), eq(22.5)); + verify(boundStatementBuilderMock).setLong(anyInt(), eq(56L)); + verify(boundStatementBuilderMock).setBoolean(anyInt(), eq(true)); + verify(boundStatementBuilderMock).setString(anyInt(), eq("some string")); + verify(boundStatementBuilderMock).setString(anyInt(), eq("{\"key\":\"value\"}")); + } + } From e561e268719dc353b15d03c0c0caf9fccf2abeb2 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 11 Jun 2024 10:41:12 +0300 Subject: [PATCH 06/21] added possibility to set ttl via config --- .../TbSaveToCustomCassandraTableNode.java | 52 ++++++++++++++++--- ...CustomCassandraTableNodeConfiguration.java | 2 + 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index 1aad815dfa..9a3950a1c2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -21,6 +21,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +30,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; +import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -35,20 +38,23 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.cassandra.guava.GuavaSession; import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.TbResultSetFuture; -import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.thingsboard.common.util.DonAsynchron.withCallback; @@ -57,6 +63,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; @RuleNode(type = ComponentType.ACTION, name = "save to custom table", configClazz = TbSaveToCustomCassandraTableNodeConfiguration.class, + version = 1, nodeDescription = "Node stores data from incoming Message payload to the Cassandra database into the predefined custom table" + " that should have cs_tb_ prefix, to avoid the data insertion to the common TB tables.
" + "Note: rule node can be used only for Cassandra DB.", @@ -81,6 +88,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private PreparedStatement saveStmt; private ExecutorService readResultsProcessingExecutor; private Map fieldsMap; + private long ttl; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -88,15 +96,25 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { cassandraCluster = ctx.getCassandraCluster(); if (cassandraCluster == null) { throw new RuntimeException("Unable to connect to Cassandra database"); - } else { - startExecutor(); - saveStmt = getSaveStmt(); + } + ctx.addTenantProfileListener(this::onTenantProfileUpdate); + onTenantProfileUpdate(ctx.getTenantProfile()); + startExecutor(); + saveStmt = getSaveStmt(); + } + + void onTenantProfileUpdate(TenantProfile tenantProfile) { + DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + long tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); + ttl = config.getDefaultTTL(); + if (ttl == 0L) { + ttl = tenantProfileDefaultStorageTtl; } } @Override public void onMsg(TbContext ctx, TbMsg msg) { - withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); + withCallback(save(msg, ctx, ttl), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); } @Override @@ -163,10 +181,13 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } + if (ttl > 0) { + query.append(" USING TTL ?"); + } return query.toString(); } - private ListenableFuture save(TbMsg msg, TbContext ctx) { + private ListenableFuture save(TbMsg msg, TbContext ctx, long ttl) { JsonElement data = JsonParser.parseString(msg.getData()); if (!data.isJsonObject()) { throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data); @@ -204,6 +225,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } i.getAndIncrement(); }); + if (ttl > 0) { + stmtBuilder.setInt(i.get(), (int) ttl); + } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } } @@ -240,4 +264,20 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { }, readResultsProcessingExecutor); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (!oldConfiguration.has("defaultTTL")) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put("defaultTTL", 0); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 0a5f153192..8d3d78e8b9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,11 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; + private long defaultTTL; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); + configuration.setDefaultTTL(0L); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); From bef6cfd339b711b9a5e23d5dd39d4c08c760d6e9 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 19 Jun 2024 16:15:08 +0300 Subject: [PATCH 07/21] refactored tests --- .../TbSaveToCustomCassandraTableNode.java | 9 ++-- .../engine/telemetry/TbMsgTimeseriesNode.java | 2 +- .../TbSaveToCustomCassandraTableNodeTest.java | 41 ++++++++++--------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index 2df4225b0f..d22ffe928a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -103,18 +103,17 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { saveStmt = getSaveStmt(); } - void onTenantProfileUpdate(TenantProfile tenantProfile) { + private void onTenantProfileUpdate(TenantProfile tenantProfile) { DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); - long tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); ttl = config.getDefaultTTL(); if (ttl == 0L) { - ttl = tenantProfileDefaultStorageTtl; + ttl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } } @Override public void onMsg(TbContext ctx, TbMsg msg) { - withCallback(save(msg, ctx, ttl), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); + withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); } @Override @@ -187,7 +186,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { return query.toString(); } - private ListenableFuture save(TbMsg msg, TbContext ctx, long ttl) { + private ListenableFuture save(TbMsg msg, TbContext ctx) { JsonElement data = JsonParser.parseString(msg.getData()); if (!data.isJsonObject()) { throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 081804639a..97b2ac116f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -75,7 +75,7 @@ public class TbMsgTimeseriesNode implements TbNode { onTenantProfileUpdate(ctx.getTenantProfile()); } - void onTenantProfileUpdate(TenantProfile tenantProfile) { + private void onTenantProfileUpdate(TenantProfile tenantProfile) { DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 3ab997238a..5d520d5ab9 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -57,6 +57,7 @@ import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -75,10 +76,10 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.never; import static org.mockito.BDDMockito.spy; import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willAnswer; -import static org.mockito.BDDMockito.never; @ExtendWith(MockitoExtension.class) public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgradeTest { @@ -136,7 +137,9 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); assertThatThrownBy(() -> node.init(ctxMock, configuration)) .isInstanceOf(TbNodeException.class) - .hasMessage("Unable to connect to Cassandra database"); + .hasMessage("Unable to connect to Cassandra database") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); } @Test @@ -238,15 +241,15 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad return Stream.of( Arguments.of(0, 0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", (Consumer) builder -> { - then(builder).should(never()).setInt(anyInt(), eq(0)); + then(builder).should(never()).setInt(anyInt(), anyInt()); }), Arguments.of(0, 5, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", (Consumer) builder -> { - then(builder).should().setInt(anyInt(), eq(432000)); + then(builder).should().setInt(1, 432000); }), Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", (Consumer) builder -> { - then(builder).should().setInt(anyInt(), eq(20)); + then(builder).should().setInt(1, 20); }) ); } @@ -255,14 +258,14 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad public void givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable() throws TbNodeException { config.setDefaultTTL(25L); config.setTableName("readings"); - config.setFieldsMapping(Map.of( - "$entityId", "entityIdTableColumn", - "doubleField", "doubleTableColumn", - "longField", "longTableColumn", - "booleanField", "booleanTableColumn", - "stringField", "stringTableColumn", - "jsonField", "jsonTableColumn" - )); + Map mappings = new LinkedHashMap<>(); + mappings.put("$entityId", "entityIdTableColumn"); + mappings.put("doubleField", "doubleTableColumn"); + mappings.put("longField", "longTableColumn"); + mappings.put("booleanField", "booleanTableColumn"); + mappings.put("stringField", "stringTableColumn"); + mappings.put("jsonField", "jsonTableColumn"); + config.setFieldsMapping(mappings); mockOnInit(); mockBoundStatementBuilder(); @@ -350,12 +353,12 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad } private void verifySettingStatementBuilder() { - then(boundStatementBuilderMock).should().setUuid(anyInt(), eq(DEVICE_ID.getId())); - then(boundStatementBuilderMock).should().setDouble(anyInt(), eq(22.5)); - then(boundStatementBuilderMock).should().setLong(anyInt(), eq(56L)); - then(boundStatementBuilderMock).should().setBoolean(anyInt(), eq(true)); - then(boundStatementBuilderMock).should().setString(anyInt(), eq("some string")); - then(boundStatementBuilderMock).should().setString(anyInt(), eq("{\"key\":\"value\"}")); + then(boundStatementBuilderMock).should().setUuid(0, DEVICE_ID.getId()); + then(boundStatementBuilderMock).should().setDouble(1, 22.5); + then(boundStatementBuilderMock).should().setLong(2, 56L); + then(boundStatementBuilderMock).should().setBoolean(3, true); + then(boundStatementBuilderMock).should().setString(4, "some string"); + then(boundStatementBuilderMock).should().setString(5, "{\"key\":\"value\"}"); then(boundStatementBuilderMock).should().setInt(anyInt(), eq(25)); } From bfdea4d7e3acac12c9b314b697b44dda3e560b08 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Thu, 20 Jun 2024 10:34:20 +0300 Subject: [PATCH 08/21] made exception in init method unrecoverable --- .../rule/engine/action/TbSaveToCustomCassandraTableNode.java | 2 +- .../engine/action/TbSaveToCustomCassandraTableNodeTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index d22ffe928a..e1aa1f6a13 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -95,7 +95,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { config = TbNodeUtils.convert(configuration, TbSaveToCustomCassandraTableNodeConfiguration.class); cassandraCluster = ctx.getCassandraCluster(); if (cassandraCluster == null) { - throw new TbNodeException("Unable to connect to Cassandra database"); + throw new TbNodeException("Unable to connect to Cassandra database", true); } ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 5d520d5ab9..27d69f2d66 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -139,7 +139,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad .isInstanceOf(TbNodeException.class) .hasMessage("Unable to connect to Cassandra database") .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(false); + .isEqualTo(true); } @Test From 88a768f06732c46987fcd462ab8d8c0dc23c8581 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 21 Jun 2024 10:50:40 +0300 Subject: [PATCH 09/21] added verification for match values insertion into statement --- .../TbSaveToCustomCassandraTableNodeTest.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 27d69f2d66..b3f3f2caa4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -34,6 +34,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; @@ -57,7 +58,8 @@ import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.TbResultSet; import org.thingsboard.server.dao.nosql.TbResultSetFuture; -import java.util.LinkedHashMap; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -74,7 +76,6 @@ import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.never; import static org.mockito.BDDMockito.spy; @@ -255,16 +256,17 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad } @Test - public void givenValidMsgStructure_whenOnMsg_thenSaveToCustomCassandraTable() throws TbNodeException { + public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { config.setDefaultTTL(25L); config.setTableName("readings"); - Map mappings = new LinkedHashMap<>(); - mappings.put("$entityId", "entityIdTableColumn"); - mappings.put("doubleField", "doubleTableColumn"); - mappings.put("longField", "longTableColumn"); - mappings.put("booleanField", "booleanTableColumn"); - mappings.put("stringField", "stringTableColumn"); - mappings.put("jsonField", "jsonTableColumn"); + Map mappings = Map.of( + "$entityId", "entityIdTableColumn", + "doubleField", "doubleTableColumn", + "longField", "longTableColumn", + "booleanField", "booleanTableColumn", + "stringField", "stringTableColumn", + "jsonField", "jsonTableColumn" + ); config.setFieldsMapping(mappings); mockOnInit(); @@ -353,13 +355,15 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad } private void verifySettingStatementBuilder() { - then(boundStatementBuilderMock).should().setUuid(0, DEVICE_ID.getId()); - then(boundStatementBuilderMock).should().setDouble(1, 22.5); - then(boundStatementBuilderMock).should().setLong(2, 56L); - then(boundStatementBuilderMock).should().setBoolean(3, true); - then(boundStatementBuilderMock).should().setString(4, "some string"); - then(boundStatementBuilderMock).should().setString(5, "{\"key\":\"value\"}"); - then(boundStatementBuilderMock).should().setInt(anyInt(), eq(25)); + Map fieldsMap = (Map) ReflectionTestUtils.getField(node, "fieldsMap"); + List values = new ArrayList<>(fieldsMap.values()); + then(boundStatementBuilderMock).should().setUuid(values.indexOf("entityIdTableColumn"), DEVICE_ID.getId()); + then(boundStatementBuilderMock).should().setDouble(values.indexOf("doubleTableColumn"), 22.5); + then(boundStatementBuilderMock).should().setLong(values.indexOf("longTableColumn"), 56L); + then(boundStatementBuilderMock).should().setBoolean(values.indexOf("booleanTableColumn"), true); + then(boundStatementBuilderMock).should().setString(values.indexOf("stringTableColumn"), "some string"); + then(boundStatementBuilderMock).should().setString(values.indexOf("jsonTableColumn"), "{\"key\":\"value\"}"); + then(boundStatementBuilderMock).should().setInt(values.size(), 25); } private TenantProfile getTenantProfileWithTtl(int ttlInDays) { From 4307d6f13ae2e8ca9887ebe95f468dd1f22d6828 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 1 Jul 2024 10:22:40 +0300 Subject: [PATCH 10/21] fixed rewriting ttl for existing nodes --- .../TbSaveToCustomCassandraTableNode.java | 28 +++++++--- ...CustomCassandraTableNodeConfiguration.java | 4 +- .../TbSaveToCustomCassandraTableNodeTest.java | 56 +++++++++++++++++-- 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index e1aa1f6a13..fcaf2420e9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -21,6 +21,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; @@ -88,7 +90,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private PreparedStatement saveStmt; private ExecutorService readResultsProcessingExecutor; private Map fieldsMap; - private long ttl; + private Integer ttl; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -97,6 +99,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { if (cassandraCluster == null) { throw new TbNodeException("Unable to connect to Cassandra database", true); } + if (!isTableExists()) { + throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.", true); + } ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); startExecutor(); @@ -106,8 +111,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private void onTenantProfileUpdate(TenantProfile tenantProfile) { DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); ttl = config.getDefaultTTL(); - if (ttl == 0L) { - ttl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); + if (ttl != null && ttl == 0) { + ttl = (int) TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } } @@ -132,6 +137,15 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } + private boolean isTableExists() { + KeyspaceMetadata keyspaceMetadata = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()).orElse(null); + if (keyspaceMetadata != null) { + TableMetadata tableMetadata = keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).orElse(null); + return tableMetadata != null; + } + return false; + } + private PreparedStatement prepare(String query) { return getSession().prepare(query); } @@ -180,7 +194,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } - if (ttl > 0) { + if (ttl != null && ttl > 0) { query.append(" USING TTL ?"); } return query.toString(); @@ -224,8 +238,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } i.getAndIncrement(); }); - if (ttl > 0) { - stmtBuilder.setInt(i.get(), (int) ttl); + if (ttl != null && ttl > 0) { + stmtBuilder.setInt(i.get(), ttl); } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } @@ -274,7 +288,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { case 0: if (!oldConfiguration.has("defaultTTL")) { hasChanges = true; - ((ObjectNode) oldConfiguration).put("defaultTTL", 0); + ((ObjectNode) oldConfiguration).putNull("defaultTTL"); } break; default: diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 8d3d78e8b9..3efae275da 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,13 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; - private long defaultTTL; + private Integer defaultTTL; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); - configuration.setDefaultTTL(0L); + configuration.setDefaultTTL(0); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index b3f3f2caa4..d691f1eaa4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -21,7 +21,10 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; import com.google.common.util.concurrent.SettableFuture; import org.junit.jupiter.api.AfterEach; @@ -61,6 +64,7 @@ import org.thingsboard.server.dao.nosql.TbResultSetFuture; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -114,6 +118,12 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad private ProtocolVersion protocolVersionMock; @Mock private Node nodeMock; + @Mock + private Metadata metadataMock; + @Mock + private KeyspaceMetadata keyspaceMetadataMock; + @Mock + private TableMetadata tableMetadataMock; @BeforeEach public void setUp() { @@ -130,7 +140,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad public void verifyDefaultConfig() { assertThat(config.getTableName()).isEqualTo(""); assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); - assertThat(config.getDefaultTTL()).isEqualTo(0L); + assertThat(config.getDefaultTTL()).isEqualTo(0); } @Test @@ -143,12 +153,28 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad .isEqualTo(true); } + @Test + public void givenTableDoesNotExist_whenInit_thenThrowsException() { + config.setTableName("test_table"); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + mockCassandraCluster(); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty()); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + @Test public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { + config.setTableName("test_table"); config.setFieldsMapping(emptyMap()); var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + mockCassandraCluster(); given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); assertThatThrownBy(() -> node.init(ctxMock, configuration)) @@ -215,7 +241,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad @ParameterizedTest @MethodSource - public void givenTtl_whenOnMsg_thenVerifyStatement(long ttlFromConfig, + public void givenTtl_whenOnMsg_thenVerifyStatement(Integer ttlFromConfig, int ttlFromTenantProfileInDays, String expectedQuery, Consumer verifyBuilder) throws TbNodeException { @@ -251,13 +277,17 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", (Consumer) builder -> { then(builder).should().setInt(1, 20); + }), + Arguments.of(null, 2, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", + (Consumer) builder -> { + then(builder).should(never()).setInt(anyInt(), anyInt()); }) ); } @Test public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { - config.setDefaultTTL(25L); + config.setDefaultTTL(25); config.setTableName("readings"); Map mappings = Map.of( "$entityId", "entityIdTableColumn", @@ -311,19 +341,33 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Arguments.of(0, "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", true, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":null}" + ), + // default config for version 1 with upgrade from version 1 + Arguments.of(1, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}", + false, "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}" ) ); } private void mockOnInit() { - given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + mockCassandraCluster(); given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); - given(cassandraClusterMock.getSession()).willReturn(sessionMock); given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE); given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock); } + private void mockCassandraCluster() { + given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + given(cassandraClusterMock.getSession()).willReturn(sessionMock); + given(sessionMock.getMetadata()).willReturn(metadataMock); + given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace"); + given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock)); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock)); + } + private void mockBoundStatement() { given(preparedStatementMock.bind()).willReturn(boundStatementMock); given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock); From 47afd13cf7af100cb14d7ad72bc9aaaed6201ac3 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 1 Jul 2024 12:17:47 +0300 Subject: [PATCH 11/21] made exception on init recoverable --- .../action/TbSaveToCustomCassandraTableNode.java | 13 ++++--------- .../TbSaveToCustomCassandraTableNodeTest.java | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index fcaf2420e9..ddc4357674 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -21,8 +21,6 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; -import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; -import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; @@ -100,7 +98,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { throw new TbNodeException("Unable to connect to Cassandra database", true); } if (!isTableExists()) { - throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.", true); + throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster."); } ctx.addTenantProfileListener(this::onTenantProfileUpdate); onTenantProfileUpdate(ctx.getTenantProfile()); @@ -138,12 +136,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } private boolean isTableExists() { - KeyspaceMetadata keyspaceMetadata = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()).orElse(null); - if (keyspaceMetadata != null) { - TableMetadata tableMetadata = keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).orElse(null); - return tableMetadata != null; - } - return false; + var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()); + return keyspaceMdOpt.map(keyspaceMetadata -> + keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).isPresent()).orElse(false); } private PreparedStatement prepare(String query) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index d691f1eaa4..f3e7d0d004 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -165,7 +165,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad .isInstanceOf(TbNodeException.class) .hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.") .extracting(e -> ((TbNodeException) e).isUnrecoverable()) - .isEqualTo(true); + .isEqualTo(false); } @Test From d3d20aa449b0ff7103a55bda971e66e0bc9a7d9f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 18 Oct 2024 16:45:11 +0200 Subject: [PATCH 12/21] used enqueue for input node for avoid latency --- .../server/actors/ruleChain/DefaultTbContext.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ec10402821..b5b389da78 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -172,8 +172,18 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { + ack(msg); + if (!msg.isValid()) { + return; + } msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg)); + TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); + TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(inputMsg)).build(); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null); } @Override From e2a356703f5c4b7e8909d182f210fb102cd1d246 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 18 Oct 2024 17:05:22 +0200 Subject: [PATCH 13/21] minor refactoring, ack used if push is success --- .../actors/ruleChain/DefaultTbContext.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index b5b389da78..23741ad965 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -109,6 +109,8 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -172,18 +174,27 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { - ack(msg); if (!msg.isValid()) { return; } - msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); + inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(inputMsg)).build(); TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, null); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + ack(msg); + } + + @Override + public void onFailure(Throwable error) { + tellFailure(msg, error); + } + }); } @Override From 726c95c661516115171bbf5e1a18e3fc05d60dce Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Sun, 20 Oct 2024 13:41:56 +0200 Subject: [PATCH 14/21] fixed tests --- .../flow/AbstractRuleEngineFlowIntegrationTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 9e36fed677..920ffdd211 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.Mockito.spy; @@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); + Awaitility.await().atMost(3, TimeUnit.SECONDS) + .until(() -> + getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) + .getData() + .stream() + .filter(filterByPostTelemetryEventType()) + .count() == 2 + ); + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList()); From 9ba71056cebff1976d92e032abe15c35d46b704a Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Fri, 25 Oct 2024 11:52:54 +0300 Subject: [PATCH 15/21] refactored UserService to reuse functionality --- .../service/entitiy/user/DefaultUserService.java | 10 ++-------- .../org/thingsboard/server/dao/user/UserService.java | 2 ++ .../thingsboard/server/dao/user/UserServiceImpl.java | 10 ++++++++++ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java index ad4b7c097e..3fe8f27873 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java @@ -90,16 +90,10 @@ public class DefaultUserService extends AbstractTbEntityService implements TbUse public UserActivationLink getActivationLink(TenantId tenantId, CustomerId customerId, UserId userId, HttpServletRequest request) throws ThingsboardException { UserCredentials userCredentials = userService.findUserCredentialsByUserId(tenantId, userId); if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) { - long ttl = userCredentials.getActivationTokenTtl(); - if (ttl < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration - userCredentials = userService.generateUserActivationToken(userCredentials); - userCredentials = userService.saveUserCredentials(tenantId, userCredentials); - ttl = userCredentials.getActivationTokenTtl(); - log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userId); - } + userCredentials = userService.refreshUserActivationToken(tenantId, userCredentials); String baseUrl = systemSecurityService.getBaseUrl(tenantId, customerId, request); String link = baseUrl + "/api/noauth/activate?activateToken=" + userCredentials.getActivateToken(); - return new UserActivationLink(link, ttl); + return new UserActivationLink(link, userCredentials.getActivationTokenTtl()); } else { throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java index 8f22812cfc..0d532cdfac 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java @@ -63,6 +63,8 @@ public interface UserService extends EntityDaoService { UserCredentials generateUserActivationToken(UserCredentials userCredentials); + UserCredentials refreshUserActivationToken(TenantId tenantId, UserCredentials userCredentials); + UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials); void deleteUser(TenantId tenantId, User user); diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java index ba0d3d69f3..d8eee2967a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java @@ -300,6 +300,16 @@ public class UserServiceImpl extends AbstractCachedEntityService Date: Wed, 6 Nov 2024 15:24:02 +0100 Subject: [PATCH 16/21] minor refactoring due to comments --- .../actors/ruleChain/DefaultTbContext.java | 37 +++++++------------ ...AbstractRuleEngineFlowIntegrationTest.java | 2 +- 2 files changed, 14 insertions(+), 25 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 23741ad965..5b4902afa5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -110,7 +110,6 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -177,24 +176,10 @@ class DefaultTbContext implements TbContext { if (!msg.isValid()) { return; } - TbMsg inputMsg = msg.copyWithRuleChainId(ruleChainId); - inputMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - TransportProtos.ToRuleEngineMsg toReMsg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(inputMsg)).build(); - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), inputMsg.getOriginator()); - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, inputMsg.getId(), toReMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - ack(msg); - } - - @Override - public void onFailure(Throwable error) { - tellFailure(msg, error); - } - }); + TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); + tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @Override @@ -230,14 +215,10 @@ class DefaultTbContext implements TbContext { } return; } - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain"); } - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback( + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback( metadata -> { if (onSuccess != null) { onSuccess.run(); @@ -252,6 +233,14 @@ class DefaultTbContext implements TbContext { })); } + private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) { + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); + } + @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { TopicPartitionInfo tpi = resolvePartition(tbMsg); diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 920ffdd211..143c6c72a1 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -333,7 +333,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); - Awaitility.await().atMost(3, TimeUnit.SECONDS) + Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS) .until(() -> getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) .getData() From 5535f11f224473de1326768a919b486a5558129f Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Tue, 12 Nov 2024 11:51:15 +0200 Subject: [PATCH 17/21] fixed log printing --- .../server/service/entitiy/user/DefaultUserService.java | 4 +--- .../java/org/thingsboard/server/dao/user/UserService.java | 2 +- .../java/org/thingsboard/server/dao/user/UserServiceImpl.java | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java index 3fe8f27873..f98f05b447 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java @@ -35,8 +35,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; import org.thingsboard.server.service.security.system.SystemSecurityService; -import java.util.concurrent.TimeUnit; - @Service @TbCoreComponent @AllArgsConstructor @@ -90,7 +88,7 @@ public class DefaultUserService extends AbstractTbEntityService implements TbUse public UserActivationLink getActivationLink(TenantId tenantId, CustomerId customerId, UserId userId, HttpServletRequest request) throws ThingsboardException { UserCredentials userCredentials = userService.findUserCredentialsByUserId(tenantId, userId); if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) { - userCredentials = userService.refreshUserActivationToken(tenantId, userCredentials); + userCredentials = userService.checkUserActivationToken(tenantId, userCredentials); String baseUrl = systemSecurityService.getBaseUrl(tenantId, customerId, request); String link = baseUrl + "/api/noauth/activate?activateToken=" + userCredentials.getActivateToken(); return new UserActivationLink(link, userCredentials.getActivationTokenTtl()); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java index 0d532cdfac..a33d64b746 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java @@ -63,7 +63,7 @@ public interface UserService extends EntityDaoService { UserCredentials generateUserActivationToken(UserCredentials userCredentials); - UserCredentials refreshUserActivationToken(TenantId tenantId, UserCredentials userCredentials); + UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials); UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials); diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java index d8eee2967a..16b25a7ede 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java @@ -301,12 +301,12 @@ public class UserServiceImpl extends AbstractCachedEntityService Date: Wed, 20 Nov 2024 19:09:02 +0100 Subject: [PATCH 18/21] Entity Data Query optimization - entityFilter.isFetchLastLevelOnly() to hit index and avoid seq scan. (and nr.relation_type_group = 'COMMON') --- .../server/dao/sql/query/DefaultEntityQueryRepository.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index dd1b4278d7..36ae779edd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -663,7 +663,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { .append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id") .append(" and ") .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type"); - + notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query notExistsPart.append(")"); whereFilter += " and ( r_int.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")"; } @@ -755,7 +755,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type") .append(" and ") .append(whereFilter.toString().replaceAll("re\\.", "nr\\.")); - + notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query notExistsPart.append(")"); whereFilter.append(" and ( r_int.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")"); } From 1397ce80a02ee85794957ac8e39b332dff297cad Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 8 Nov 2024 08:36:09 +0100 Subject: [PATCH 19/21] set bigger timeout for getting resutl in tests --- .../timeseries/sql/LatestTimeseriesPerformanceTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java index d81eea997c..a2abfc28b6 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/sql/LatestTimeseriesPerformanceTest.java @@ -56,7 +56,8 @@ public class LatestTimeseriesPerformanceTest extends AbstractServiceTest { private static final String LONG_KEY = "longKey"; private static final String DOUBLE_KEY = "doubleKey"; private static final String BOOLEAN_KEY = "booleanKey"; - public static final int AMOUNT_OF_UNIQ_KEY = 10000; + private static final int AMOUNT_OF_UNIQ_KEY = 10000; + private static final int TIMEOUT = 100; private final Random random = new Random(); @@ -102,7 +103,7 @@ public class LatestTimeseriesPerformanceTest extends AbstractServiceTest { futures.add(save(generateDblEntry(getRandomKey()))); futures.add(save(generateBoolEntry(getRandomKey()))); } - Futures.allAsList(futures).get(60, TimeUnit.SECONDS); + Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS); long endTime = System.currentTimeMillis(); long totalTime = endTime - startTime; @@ -120,7 +121,7 @@ public class LatestTimeseriesPerformanceTest extends AbstractServiceTest { futures.add(save(generateDblEntry(i))); futures.add(save(generateBoolEntry(i))); } - Futures.allAsList(futures).get(60, TimeUnit.SECONDS); + Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS); } private ListenableFuture save(TsKvEntry tsKvEntry) { From 65f94878d24f17842602a98b27d247e7aad75759 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 27 Nov 2024 13:36:48 +0200 Subject: [PATCH 20/21] removed using TTL from tenant profile --- .../TbSaveToCustomCassandraTableNode.java | 24 +++------- ...CustomCassandraTableNodeConfiguration.java | 4 +- .../engine/telemetry/TbMsgTimeseriesNode.java | 2 +- .../TbSaveToCustomCassandraTableNodeTest.java | 45 ++++--------------- 4 files changed, 17 insertions(+), 58 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index ddc4357674..7d4cd64179 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -38,10 +38,8 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; -import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.dao.cassandra.CassandraCluster; @@ -54,7 +52,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.thingsboard.common.util.DonAsynchron.withCallback; @@ -88,7 +85,6 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { private PreparedStatement saveStmt; private ExecutorService readResultsProcessingExecutor; private Map fieldsMap; - private Integer ttl; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -100,20 +96,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { if (!isTableExists()) { throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster."); } - ctx.addTenantProfileListener(this::onTenantProfileUpdate); - onTenantProfileUpdate(ctx.getTenantProfile()); startExecutor(); saveStmt = getSaveStmt(); } - private void onTenantProfileUpdate(TenantProfile tenantProfile) { - DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); - ttl = config.getDefaultTTL(); - if (ttl != null && ttl == 0) { - ttl = (int) TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); - } - } - @Override public void onMsg(TbContext ctx, TbMsg msg) { withCallback(save(msg, ctx), aVoid -> ctx.tellSuccess(msg), e -> ctx.tellFailure(msg, e), ctx.getDbCallbackExecutor()); @@ -189,7 +175,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } - if (ttl != null && ttl > 0) { + if (config.getDefaultTtL() > 0) { query.append(" USING TTL ?"); } return query.toString(); @@ -233,8 +219,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } i.getAndIncrement(); }); - if (ttl != null && ttl > 0) { - stmtBuilder.setInt(i.get(), ttl); + if (config.getDefaultTtL() > 0) { + stmtBuilder.setInt(i.get(), config.getDefaultTtL()); } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } @@ -281,9 +267,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { boolean hasChanges = false; switch (fromVersion) { case 0: - if (!oldConfiguration.has("defaultTTL")) { + if (!oldConfiguration.has("defaultTtL")) { hasChanges = true; - ((ObjectNode) oldConfiguration).putNull("defaultTTL"); + ((ObjectNode) oldConfiguration).put("defaultTtL", 0); } break; default: diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 3efae275da..e391e178f5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,13 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; - private Integer defaultTTL; + private int defaultTtL; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); - configuration.setDefaultTTL(0); + configuration.setDefaultTtL(0); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index c99da715d9..c90e8e2375 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -75,7 +75,7 @@ public class TbMsgTimeseriesNode implements TbNode { onTenantProfileUpdate(ctx.getTenantProfile()); } - private void onTenantProfileUpdate(TenantProfile tenantProfile) { + void onTenantProfileUpdate(TenantProfile tenantProfile) { DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index f3e7d0d004..2ef5d4173c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -46,13 +46,9 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.cassandra.CassandraCluster; @@ -91,7 +87,6 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e")); - private final TenantProfileId TENANT_PROFILE_ID = new TenantProfileId(UUID.fromString("239af5ad-53c1-4179-802f-3821f92ed338")); private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor(); @@ -140,7 +135,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad public void verifyDefaultConfig() { assertThat(config.getTableName()).isEqualTo(""); assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); - assertThat(config.getDefaultTTL()).isEqualTo(0); + assertThat(config.getDefaultTtL()).isEqualTo(0); } @Test @@ -175,7 +170,6 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); mockCassandraCluster(); - given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); assertThatThrownBy(() -> node.init(ctxMock, configuration)) .isInstanceOf(TbNodeException.class) @@ -241,16 +235,14 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad @ParameterizedTest @MethodSource - public void givenTtl_whenOnMsg_thenVerifyStatement(Integer ttlFromConfig, - int ttlFromTenantProfileInDays, + public void givenTtl_whenOnMsg_thenVerifyStatement(int ttlFromConfig, String expectedQuery, Consumer verifyBuilder) throws TbNodeException { config.setTableName("readings"); config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn")); - config.setDefaultTTL(ttlFromConfig); + config.setDefaultTtL(ttlFromConfig); mockOnInit(); - given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(ttlFromTenantProfileInDays)); willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder(); given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock); given(boundStatementBuilderMock.build()).willReturn(boundStatementMock); @@ -266,28 +258,20 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad private static Stream givenTtl_whenOnMsg_thenVerifyStatement() { return Stream.of( - Arguments.of(0, 0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", + Arguments.of(0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", (Consumer) builder -> { then(builder).should(never()).setInt(anyInt(), anyInt()); }), - Arguments.of(0, 5, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", - (Consumer) builder -> { - then(builder).should().setInt(1, 432000); - }), - Arguments.of(20, 1, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", + Arguments.of(20, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", (Consumer) builder -> { then(builder).should().setInt(1, 20); - }), - Arguments.of(null, 2, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", - (Consumer) builder -> { - then(builder).should(never()).setInt(anyInt(), anyInt()); }) ); } @Test public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { - config.setDefaultTTL(25); + config.setDefaultTtL(25); config.setTableName("readings"); Map mappings = Map.of( "$entityId", "entityIdTableColumn", @@ -341,20 +325,19 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Arguments.of(0, "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", true, - "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":null}" + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}" ), // default config for version 1 with upgrade from version 1 Arguments.of(1, - "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}", + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}", false, - "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTTL\":0}" + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}" ) ); } private void mockOnInit() { mockCassandraCluster(); - given(ctxMock.getTenantProfile()).willReturn(getTenantProfileWithTtl(5)); given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE); given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock); } @@ -410,14 +393,4 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad then(boundStatementBuilderMock).should().setInt(values.size(), 25); } - private TenantProfile getTenantProfileWithTtl(int ttlInDays) { - var tenantProfile = new TenantProfile(TENANT_PROFILE_ID); - var tenantProfileData = new TenantProfileData(); - var tenantProfileConfiguration = new DefaultTenantProfileConfiguration(); - tenantProfileConfiguration.setDefaultStorageTtlDays(ttlInDays); - tenantProfileData.setConfiguration(tenantProfileConfiguration); - tenantProfile.setProfileData(tenantProfileData); - return tenantProfile; - } - } From baed618604d43c8a2de1d7cfa14c04d8cf0d7e39 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 27 Nov 2024 13:57:10 +0200 Subject: [PATCH 21/21] renamed property --- .../action/TbSaveToCustomCassandraTableNode.java | 10 +++++----- ...bSaveToCustomCassandraTableNodeConfiguration.java | 4 ++-- .../action/TbSaveToCustomCassandraTableNodeTest.java | 12 ++++++------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index 7d4cd64179..a0dbc5da97 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -175,7 +175,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } - if (config.getDefaultTtL() > 0) { + if (config.getDefaultTtl() > 0) { query.append(" USING TTL ?"); } return query.toString(); @@ -219,8 +219,8 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } i.getAndIncrement(); }); - if (config.getDefaultTtL() > 0) { - stmtBuilder.setInt(i.get(), config.getDefaultTtL()); + if (config.getDefaultTtl() > 0) { + stmtBuilder.setInt(i.get(), config.getDefaultTtl()); } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } @@ -267,9 +267,9 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { boolean hasChanges = false; switch (fromVersion) { case 0: - if (!oldConfiguration.has("defaultTtL")) { + if (!oldConfiguration.has("defaultTtl")) { hasChanges = true; - ((ObjectNode) oldConfiguration).put("defaultTtL", 0); + ((ObjectNode) oldConfiguration).put("defaultTtl", 0); } break; default: diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index e391e178f5..fefe871f82 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,13 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; - private int defaultTtL; + private int defaultTtl; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); - configuration.setDefaultTtL(0); + configuration.setDefaultTtl(0); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java index 2ef5d4173c..7df2a982de 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -135,7 +135,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad public void verifyDefaultConfig() { assertThat(config.getTableName()).isEqualTo(""); assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); - assertThat(config.getDefaultTtL()).isEqualTo(0); + assertThat(config.getDefaultTtl()).isEqualTo(0); } @Test @@ -240,7 +240,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Consumer verifyBuilder) throws TbNodeException { config.setTableName("readings"); config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn")); - config.setDefaultTtL(ttlFromConfig); + config.setDefaultTtl(ttlFromConfig); mockOnInit(); willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder(); @@ -271,7 +271,7 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad @Test public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { - config.setDefaultTtL(25); + config.setDefaultTtl(25); config.setTableName("readings"); Map mappings = Map.of( "$entityId", "entityIdTableColumn", @@ -325,13 +325,13 @@ public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgrad Arguments.of(0, "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", true, - "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}" + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}" ), // default config for version 1 with upgrade from version 1 Arguments.of(1, - "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}", + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}", false, - "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtL\":0}" + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}" ) ); }