diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a62cb432da..d27bf40650 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -110,6 +110,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -173,8 +174,13 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { - msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg)); + if (!msg.isValid()) { + return; + } + TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); + tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @Override @@ -210,14 +216,10 @@ class DefaultTbContext implements TbContext { } return; } - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain"); } - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback( + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback( metadata -> { if (onSuccess != null) { onSuccess.run(); @@ -232,6 +234,14 @@ class DefaultTbContext implements TbContext { })); } + private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) { + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); + } + @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { TopicPartitionInfo tpi = resolvePartition(tbMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java index 4e9e8e954c..693f70cc9d 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java @@ -35,8 +35,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; import org.thingsboard.server.service.security.system.SystemSecurityService; -import java.util.concurrent.TimeUnit; - @Service @TbCoreComponent @AllArgsConstructor @@ -90,16 +88,10 @@ public class DefaultUserService extends AbstractTbEntityService implements TbUse public UserActivationLink getActivationLink(TenantId tenantId, CustomerId customerId, UserId userId, HttpServletRequest request) throws ThingsboardException { UserCredentials userCredentials = userService.findUserCredentialsByUserId(tenantId, userId); if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) { - long ttl = userCredentials.getActivationTokenTtl(); - if (ttl < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration - userCredentials = userService.generateUserActivationToken(userCredentials); - userCredentials = userService.saveUserCredentials(tenantId, userCredentials); - ttl = userCredentials.getActivationTokenTtl(); - log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userId); - } + userCredentials = userService.checkUserActivationToken(tenantId, userCredentials); String baseUrl = systemSecurityService.getBaseUrl(tenantId, customerId, request); String link = baseUrl + "/api/noauth/activate?activateToken=" + userCredentials.getActivateToken(); - return new UserActivationLink(link, ttl); + return new UserActivationLink(link, userCredentials.getActivationTokenTtl()); } else { throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS); } diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 9e36fed677..143c6c72a1 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.Mockito.spy; @@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); + Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> + getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) + .getData() + .stream() + .filter(filterByPostTelemetryEventType()) + .count() == 2 + ); + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList()); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java index 586ae50f5d..6412921675 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java @@ -63,6 +63,8 @@ public interface UserService extends EntityDaoService { UserCredentials generateUserActivationToken(UserCredentials userCredentials); + UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials); + UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials); void deleteUser(TenantId tenantId, User user); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index dd1b4278d7..36ae779edd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -663,7 +663,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { .append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id") .append(" and ") .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type"); - + notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query notExistsPart.append(")"); whereFilter += " and ( r_int.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")"; } @@ -755,7 +755,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type") .append(" and ") .append(whereFilter.toString().replaceAll("re\\.", "nr\\.")); - + notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query notExistsPart.append(")"); whereFilter.append(" and ( r_int.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")"); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java index f61854f062..206d55354d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java @@ -292,6 +292,16 @@ public class UserServiceImpl extends AbstractCachedEntityServicecs_tb_ prefix, to avoid the data insertion to the common TB tables.
" + "Note: rule node can be used only for Cassandra DB.", @@ -87,11 +91,13 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { config = TbNodeUtils.convert(configuration, TbSaveToCustomCassandraTableNodeConfiguration.class); cassandraCluster = ctx.getCassandraCluster(); if (cassandraCluster == null) { - throw new RuntimeException("Unable to connect to Cassandra database"); - } else { - startExecutor(); - saveStmt = getSaveStmt(); + throw new TbNodeException("Unable to connect to Cassandra database", true); } + if (!isTableExists()) { + throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster."); + } + startExecutor(); + saveStmt = getSaveStmt(); } @Override @@ -115,6 +121,12 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } + private boolean isTableExists() { + var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()); + return keyspaceMdOpt.map(keyspaceMetadata -> + keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).isPresent()).orElse(false); + } + private PreparedStatement prepare(String query) { return getSession().prepare(query); } @@ -127,10 +139,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { return session; } - private PreparedStatement getSaveStmt() { + private PreparedStatement getSaveStmt() throws TbNodeException { fieldsMap = config.getFieldsMapping(); if (fieldsMap.isEmpty()) { - throw new RuntimeException("Fields(key,value) map is empty!"); + throw new TbNodeException("Fields(key,value) map is empty!", true); } else { return prepareStatement(new ArrayList<>(fieldsMap.values())); } @@ -163,16 +175,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } + if (config.getDefaultTtl() > 0) { + query.append(" USING TTL ?"); + } return query.toString(); } private ListenableFuture save(TbMsg msg, TbContext ctx) { JsonElement data = JsonParser.parseString(msg.getData()); if (!data.isJsonObject()) { - throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data); + throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data); } else { JsonObject dataAsObject = data.getAsJsonObject(); - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(saveStmt.bind()); + BoundStatementBuilder stmtBuilder = getStmtBuilder(); AtomicInteger i = new AtomicInteger(0); fieldsMap.forEach((key, value) -> { if (key.equals(ENTITY_ID)) { @@ -197,17 +212,24 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } else if (dataKeyElement.isJsonObject()) { stmtBuilder.setString(i.get(), dataKeyElement.getAsJsonObject().toString()); } else { - throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Object or JSON Primitive!"); + throw new IllegalStateException("Message data key: '" + key + "' with value: '" + dataKeyElement + "' is not a JSON Object or JSON Primitive!"); } } else { throw new RuntimeException("Message data doesn't contain key: " + "'" + key + "'!"); } i.getAndIncrement(); }); + if (config.getDefaultTtl() > 0) { + stmtBuilder.setInt(i.get(), config.getDefaultTtl()); + } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } } + BoundStatementBuilder getStmtBuilder() { + return new BoundStatementBuilder(saveStmt.bind()); + } + private TbResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) { return executeAsync(ctx, statement, defaultWriteLevel); } @@ -240,4 +262,20 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { }, readResultsProcessingExecutor); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (!oldConfiguration.has("defaultTtl")) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put("defaultTtl", 0); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 0a5f153192..fefe871f82 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,11 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; + private int defaultTtl; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); + configuration.setDefaultTtl(0); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java new file mode 100644 index 0000000000..7df2a982de --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -0,0 +1,396 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; +import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.cassandra.guava.GuavaSession; +import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.dao.nosql.TbResultSet; +import org.thingsboard.server.dao.nosql.TbResultSetFuture; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willAnswer; + +@ExtendWith(MockitoExtension.class) +public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgradeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e")); + + private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor(); + + private TbSaveToCustomCassandraTableNode node; + private TbSaveToCustomCassandraTableNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private CassandraCluster cassandraClusterMock; + @Mock + private GuavaSession sessionMock; + @Mock + private PreparedStatement preparedStatementMock; + @Mock + private BoundStatement boundStatementMock; + @Mock + private BoundStatementBuilder boundStatementBuilderMock; + @Mock + private ColumnDefinitions columnDefinitionsMock; + @Mock + private CodecRegistry codecRegistryMock; + @Mock + private ProtocolVersion protocolVersionMock; + @Mock + private Node nodeMock; + @Mock + private Metadata metadataMock; + @Mock + private KeyspaceMetadata keyspaceMetadataMock; + @Mock + private TableMetadata tableMetadataMock; + + @BeforeEach + public void setUp() { + node = spy(new TbSaveToCustomCassandraTableNode()); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + } + + @AfterEach + public void tearDown() { + node.destroy(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(config.getTableName()).isEqualTo(""); + assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); + assertThat(config.getDefaultTtl()).isEqualTo(0); + } + + @Test + public void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Unable to connect to Cassandra database") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @Test + public void givenTableDoesNotExist_whenInit_thenThrowsException() { + config.setTableName("test_table"); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + mockCassandraCluster(); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty()); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); + } + + @Test + public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { + config.setTableName("test_table"); + config.setFieldsMapping(emptyMap()); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + mockCassandraCluster(); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Fields(key,value) map is empty!"); + } + + @Test + public void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + + mockOnInit(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid message structure, it is not a JSON Object: " + null); + } + + @Test + public void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + + mockOnInit(); + mockBoundStatement(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + String data = """ + { + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data doesn't contain key: 'temp'!"); + } + + @Test + public void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + + mockOnInit(); + mockBoundStatement(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + String data = """ + { + "temp": [value] + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!"); + } + + @ParameterizedTest + @MethodSource + public void givenTtl_whenOnMsg_thenVerifyStatement(int ttlFromConfig, + String expectedQuery, + Consumer verifyBuilder) throws TbNodeException { + config.setTableName("readings"); + config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn")); + config.setDefaultTtl(ttlFromConfig); + + mockOnInit(); + willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder(); + given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.build()).willReturn(boundStatementMock); + mockSubmittingCassandraTask(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + then(sessionMock).should().prepare(expectedQuery); + verifyBuilder.accept(boundStatementBuilderMock); + } + + private static Stream givenTtl_whenOnMsg_thenVerifyStatement() { + return Stream.of( + Arguments.of(0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", + (Consumer) builder -> { + then(builder).should(never()).setInt(anyInt(), anyInt()); + }), + Arguments.of(20, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", + (Consumer) builder -> { + then(builder).should().setInt(1, 20); + }) + ); + } + + @Test + public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { + config.setDefaultTtl(25); + config.setTableName("readings"); + Map mappings = Map.of( + "$entityId", "entityIdTableColumn", + "doubleField", "doubleTableColumn", + "longField", "longTableColumn", + "booleanField", "booleanTableColumn", + "stringField", "stringTableColumn", + "jsonField", "jsonTableColumn" + ); + config.setFieldsMapping(mappings); + + mockOnInit(); + mockBoundStatementBuilder(); + mockSubmittingCassandraTask(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + String data = """ + { + "doubleField": 22.5, + "longField": 56, + "booleanField": true, + "stringField": "some string", + "jsonField": { + "key": "value" + } + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + node.onMsg(ctxMock, msg); + + verifySettingStatementBuilder(); + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class); + then(ctxMock).should().submitCassandraWriteTask(taskCaptor.capture()); + CassandraStatementTask task = taskCaptor.getValue(); + assertThat(task.getTenantId()).isEqualTo(TENANT_ID); + assertThat(task.getSession()).isEqualTo(sessionMock); + assertThat(task.getStatement()).isEqualTo(boundStatementMock); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> then(ctxMock).should().tellSuccess(msg) + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } + + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", + true, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}" + ), + // default config for version 1 with upgrade from version 1 + Arguments.of(1, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}", + false, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}" + ) + ); + } + + private void mockOnInit() { + mockCassandraCluster(); + given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE); + given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock); + } + + private void mockCassandraCluster() { + given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + given(cassandraClusterMock.getSession()).willReturn(sessionMock); + given(sessionMock.getMetadata()).willReturn(metadataMock); + given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace"); + given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock)); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock)); + } + + private void mockBoundStatement() { + given(preparedStatementMock.bind()).willReturn(boundStatementMock); + given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock); + given(preparedStatementMock.getVariableDefinitions()).willReturn(columnDefinitionsMock); + given(boundStatementMock.codecRegistry()).willReturn(codecRegistryMock); + given(boundStatementMock.protocolVersion()).willReturn(protocolVersionMock); + given(boundStatementMock.getNode()).willReturn(nodeMock); + } + + private void mockBoundStatementBuilder() { + willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder(); + given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setDouble(anyInt(), anyDouble())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setLong(anyInt(), anyLong())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setBoolean(anyInt(), anyBoolean())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setString(anyInt(), anyString())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setInt(anyInt(), anyInt())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.build()).willReturn(boundStatementMock); + } + + private void mockSubmittingCassandraTask() { + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + willAnswer(invocation -> { + SettableFuture mainFuture = SettableFuture.create(); + mainFuture.set(new TbResultSet(null, null, null)); + return new TbResultSetFuture(mainFuture); + }).given(ctxMock).submitCassandraWriteTask(any()); + given(ctxMock.getDbCallbackExecutor()).willReturn(dbCallbackExecutor); + } + + private void verifySettingStatementBuilder() { + Map fieldsMap = (Map) ReflectionTestUtils.getField(node, "fieldsMap"); + List values = new ArrayList<>(fieldsMap.values()); + then(boundStatementBuilderMock).should().setUuid(values.indexOf("entityIdTableColumn"), DEVICE_ID.getId()); + then(boundStatementBuilderMock).should().setDouble(values.indexOf("doubleTableColumn"), 22.5); + then(boundStatementBuilderMock).should().setLong(values.indexOf("longTableColumn"), 56L); + then(boundStatementBuilderMock).should().setBoolean(values.indexOf("booleanTableColumn"), true); + then(boundStatementBuilderMock).should().setString(values.indexOf("stringTableColumn"), "some string"); + then(boundStatementBuilderMock).should().setString(values.indexOf("jsonTableColumn"), "{\"key\":\"value\"}"); + then(boundStatementBuilderMock).should().setInt(values.size(), 25); + } + +}