From ada9f8b7ebd422a6918b5abcc7d11659bcdce046 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 16 Apr 2024 15:40:34 +0300 Subject: [PATCH] added tests for save to custom table node --- .../TbSaveToCustomCassandraTableNode.java | 8 +- .../TbSaveToCustomCassandraTableNodeTest.java | 248 ++++++++++++++++++ 2 files changed, 254 insertions(+), 2 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java index 4b957db9cd..1e55e9d320 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNode.java @@ -173,7 +173,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data); } else { JsonObject dataAsObject = data.getAsJsonObject(); - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(saveStmt.bind()); + BoundStatementBuilder stmtBuilder = getStmtBuilder(); AtomicInteger i = new AtomicInteger(0); fieldsMap.forEach((key, value) -> { if (key.equals(ENTITY_ID)) { @@ -198,7 +198,7 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } else if (dataKeyElement.isJsonObject()) { stmtBuilder.setString(i.get(), dataKeyElement.getAsJsonObject().toString()); } else { - throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Object or JSON Primitive!"); + throw new IllegalStateException("Message data key: '" + key + "' with value: '" + dataKeyElement + "' is not a JSON Object or JSON Primitive!"); } } else { throw new RuntimeException("Message data doesn't contain key: " + "'" + key + "'!"); @@ -209,6 +209,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } + BoundStatementBuilder getStmtBuilder() { + return new BoundStatementBuilder(saveStmt.bind()); + } + private TbResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) { return executeAsync(ctx, statement, defaultWriteLevel); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java new file mode 100644 index 0000000000..699af15fa3 --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -0,0 +1,248 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; + +import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.cassandra.guava.GuavaSession; +import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.dao.nosql.TbResultSet; +import org.thingsboard.server.dao.nosql.TbResultSetFuture; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TbSaveToCustomCassandraTableNodeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e")); + + private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor(); + + private TbSaveToCustomCassandraTableNode node; + private TbSaveToCustomCassandraTableNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private CassandraCluster cassandraClusterMock; + @Mock + private GuavaSession sessionMock; + @Mock + private PreparedStatement preparedStatementMock; + @Mock + private BoundStatement boundStatementMock; + @Mock + private BoundStatementBuilder boundStatementBuilderMock; + @Mock + private ColumnDefinitions columnDefinitionsMock; + @Mock + private CodecRegistry codecRegistryMock; + @Mock + private ProtocolVersion protocolVersionMock; + @Mock + private Node nodeMock; + + @Test + void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Unable to connect to Cassandra database"); + } + + @Test + void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setFieldsMapping(emptyMap()); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + when(ctxMock.getCassandraCluster()).thenReturn(cassandraClusterMock); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Fields(key,value) map is empty!"); + } + + @Test + void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + + node.init(ctxMock, configuration); + + String data = ""; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid message structure, it is not a JSON Object:" + null); + } + + @Test + void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + when(preparedStatementMock.bind()).thenReturn(boundStatementMock); + when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); + when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); + when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); + when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); + when(boundStatementMock.getNode()).thenReturn(nodeMock); + + node.init(ctxMock, configuration); + + String data = """ + { + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);; + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data doesn't contain key: 'temp'!"); + } + + @Test + void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException { + node = new TbSaveToCustomCassandraTableNode(); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + when(preparedStatementMock.bind()).thenReturn(boundStatementMock); + when(boundStatementMock.getPreparedStatement()).thenReturn(preparedStatementMock); + when(preparedStatementMock.getVariableDefinitions()).thenReturn(columnDefinitionsMock); + when(boundStatementMock.getValues()).thenReturn(List.of(ByteBuffer.allocate(1))); + when(boundStatementMock.codecRegistry()).thenReturn(codecRegistryMock); + when(boundStatementMock.protocolVersion()).thenReturn(protocolVersionMock); + when(boundStatementMock.getNode()).thenReturn(nodeMock); + + node.init(ctxMock, configuration); + + String data = """ + { + "temp": [value] + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!"); + } + + @Test + void givenValidMsgStructure_whenOnMsg_thenOk() throws Exception { + node = spy(new TbSaveToCustomCassandraTableNode()); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + initializeMocks(); + doAnswer(invocation -> boundStatementBuilderMock).when(node).getStmtBuilder(); + when(boundStatementBuilderMock.setLong(anyInt(), anyLong())).thenReturn(boundStatementBuilderMock); + when(boundStatementBuilderMock.build()).thenReturn(boundStatementMock); + when(ctxMock.getTenantId()).thenReturn(TENANT_ID); + doAnswer(invocation -> { + SettableFuture mainFuture = SettableFuture.create(); + mainFuture.set(new TbResultSet(null, null, null)); + TbResultSetFuture value = new TbResultSetFuture(mainFuture); + return value; + }).when(ctxMock).submitCassandraWriteTask(any()); + when(ctxMock.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); + + node.init(ctxMock, configuration); + + String data = """ + { + "temp": 31, + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + + node.onMsg(ctxMock, msg); + + verify(sessionMock).prepare(eq("INSERT INTO cs_tb_temperature_sensor(temperature) VALUES(?)")); + verify(ctxMock).submitCassandraWriteTask(any(CassandraStatementTask.class)); + verify(ctxMock).tellSuccess(eq(msg)); + } + + private void initializeMocks() { + when(ctxMock.getCassandraCluster()).thenReturn(cassandraClusterMock); + when(cassandraClusterMock.getSession()).thenReturn(sessionMock); + when(cassandraClusterMock.getDefaultWriteConsistencyLevel()).thenReturn(DefaultConsistencyLevel.ONE); + when(sessionMock.prepare(anyString())).thenReturn(preparedStatementMock); + } +}