Merge pull request #10581 from irynamatveieva/improvements/save-to-custom-table-node

Save to custom table node: add TTL option
This commit is contained in:
Viacheslav Klimov 2024-11-28 13:15:52 +02:00 committed by GitHub
commit be92fae2bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 489 additions and 28 deletions

View File

@ -110,6 +110,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
@ -173,8 +174,13 @@ class DefaultTbContext implements TbContext {
@Override
public void input(TbMsg msg, RuleChainId ruleChainId) {
msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg));
if (!msg.isValid()) {
return;
}
TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId);
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
}
@Override
@ -210,14 +216,10 @@ class DefaultTbContext implements TbContext {
}
return;
}
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
@ -232,6 +234,14 @@ class DefaultTbContext implements TbContext {
}));
}
private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) {
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
}
@Override
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);

View File

@ -35,8 +35,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
import org.thingsboard.server.service.security.system.SystemSecurityService;
import java.util.concurrent.TimeUnit;
@Service
@TbCoreComponent
@AllArgsConstructor
@ -90,16 +88,10 @@ public class DefaultUserService extends AbstractTbEntityService implements TbUse
public UserActivationLink getActivationLink(TenantId tenantId, CustomerId customerId, UserId userId, HttpServletRequest request) throws ThingsboardException {
UserCredentials userCredentials = userService.findUserCredentialsByUserId(tenantId, userId);
if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) {
long ttl = userCredentials.getActivationTokenTtl();
if (ttl < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration
userCredentials = userService.generateUserActivationToken(userCredentials);
userCredentials = userService.saveUserCredentials(tenantId, userCredentials);
ttl = userCredentials.getActivationTokenTtl();
log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userId);
}
userCredentials = userService.checkUserActivationToken(tenantId, userCredentials);
String baseUrl = systemSecurityService.getBaseUrl(tenantId, customerId, request);
String link = baseUrl + "/api/noauth/activate?activateToken=" + userCredentials.getActivateToken();
return new UserActivationLink(link, ttl);
return new UserActivationLink(link, userCredentials.getActivationTokenTtl());
} else {
throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.mockito.Mockito.spy;
@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChain finalRuleChain = rootRuleChain;
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() ->
getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000)
.getData()
.stream()
.filter(filterByPostTelemetryEventType())
.count() == 2
);
eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList());

View File

@ -63,6 +63,8 @@ public interface UserService extends EntityDaoService {
UserCredentials generateUserActivationToken(UserCredentials userCredentials);
UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials);
UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials);
void deleteUser(TenantId tenantId, User user);

View File

@ -663,7 +663,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
.append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id")
.append(" and ")
.append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type");
notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query
notExistsPart.append(")");
whereFilter += " and ( r_int.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")";
}
@ -755,7 +755,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
.append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type")
.append(" and ")
.append(whereFilter.toString().replaceAll("re\\.", "nr\\."));
notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query
notExistsPart.append(")");
whereFilter.append(" and ( r_int.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")");
}

View File

@ -292,6 +292,16 @@ public class UserServiceImpl extends AbstractCachedEntityService<UserCacheKey, U
return userCredentials;
}
@Override
public UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials) {
if (userCredentials.getActivationTokenTtl() < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration
userCredentials = generateUserActivationToken(userCredentials);
userCredentials = saveUserCredentials(tenantId, userCredentials);
log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userCredentials.getUserId());
}
return userCredentials;
}
@Override
public UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials) {
log.trace("Executing replaceUserCredentials [{}]", userCredentials);

View File

@ -21,6 +21,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -38,6 +40,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
@ -57,6 +60,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
@RuleNode(type = ComponentType.ACTION,
name = "save to custom table",
configClazz = TbSaveToCustomCassandraTableNodeConfiguration.class,
version = 1,
nodeDescription = "Node stores data from incoming Message payload to the Cassandra database into the predefined custom table" +
" that should have <b>cs_tb_</b> prefix, to avoid the data insertion to the common TB tables.<br>" +
"<b>Note:</b> rule node can be used only for Cassandra DB.",
@ -87,11 +91,13 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
config = TbNodeUtils.convert(configuration, TbSaveToCustomCassandraTableNodeConfiguration.class);
cassandraCluster = ctx.getCassandraCluster();
if (cassandraCluster == null) {
throw new RuntimeException("Unable to connect to Cassandra database");
} else {
startExecutor();
saveStmt = getSaveStmt();
throw new TbNodeException("Unable to connect to Cassandra database", true);
}
if (!isTableExists()) {
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.");
}
startExecutor();
saveStmt = getSaveStmt();
}
@Override
@ -115,6 +121,12 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
}
}
private boolean isTableExists() {
var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName());
return keyspaceMdOpt.map(keyspaceMetadata ->
keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).isPresent()).orElse(false);
}
private PreparedStatement prepare(String query) {
return getSession().prepare(query);
}
@ -127,10 +139,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
return session;
}
private PreparedStatement getSaveStmt() {
private PreparedStatement getSaveStmt() throws TbNodeException {
fieldsMap = config.getFieldsMapping();
if (fieldsMap.isEmpty()) {
throw new RuntimeException("Fields(key,value) map is empty!");
throw new TbNodeException("Fields(key,value) map is empty!", true);
} else {
return prepareStatement(new ArrayList<>(fieldsMap.values()));
}
@ -163,16 +175,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
query.append("?, ");
}
}
if (config.getDefaultTtl() > 0) {
query.append(" USING TTL ?");
}
return query.toString();
}
private ListenableFuture<Void> save(TbMsg msg, TbContext ctx) {
JsonElement data = JsonParser.parseString(msg.getData());
if (!data.isJsonObject()) {
throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data);
throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data);
} else {
JsonObject dataAsObject = data.getAsJsonObject();
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(saveStmt.bind());
BoundStatementBuilder stmtBuilder = getStmtBuilder();
AtomicInteger i = new AtomicInteger(0);
fieldsMap.forEach((key, value) -> {
if (key.equals(ENTITY_ID)) {
@ -197,17 +212,24 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
} else if (dataKeyElement.isJsonObject()) {
stmtBuilder.setString(i.get(), dataKeyElement.getAsJsonObject().toString());
} else {
throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Object or JSON Primitive!");
throw new IllegalStateException("Message data key: '" + key + "' with value: '" + dataKeyElement + "' is not a JSON Object or JSON Primitive!");
}
} else {
throw new RuntimeException("Message data doesn't contain key: " + "'" + key + "'!");
}
i.getAndIncrement();
});
if (config.getDefaultTtl() > 0) {
stmtBuilder.setInt(i.get(), config.getDefaultTtl());
}
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
}
}
BoundStatementBuilder getStmtBuilder() {
return new BoundStatementBuilder(saveStmt.bind());
}
private TbResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) {
return executeAsync(ctx, statement, defaultWriteLevel);
}
@ -240,4 +262,20 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
}, readResultsProcessingExecutor);
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (!oldConfiguration.has("defaultTtl")) {
hasChanges = true;
((ObjectNode) oldConfiguration).put("defaultTtl", 0);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

View File

@ -27,11 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig
private String tableName;
private Map<String, String> fieldsMapping;
private int defaultTtl;
@Override
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
configuration.setDefaultTtl(0);
configuration.setTableName("");
Map<String, String> map = new HashMap<>();
map.put("", "");

View File

@ -0,0 +1,396 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.dao.nosql.TbResultSet;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.spy;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willAnswer;
@ExtendWith(MockitoExtension.class)
public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgradeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7"));
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e"));
private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor();
private TbSaveToCustomCassandraTableNode node;
private TbSaveToCustomCassandraTableNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private CassandraCluster cassandraClusterMock;
@Mock
private GuavaSession sessionMock;
@Mock
private PreparedStatement preparedStatementMock;
@Mock
private BoundStatement boundStatementMock;
@Mock
private BoundStatementBuilder boundStatementBuilderMock;
@Mock
private ColumnDefinitions columnDefinitionsMock;
@Mock
private CodecRegistry codecRegistryMock;
@Mock
private ProtocolVersion protocolVersionMock;
@Mock
private Node nodeMock;
@Mock
private Metadata metadataMock;
@Mock
private KeyspaceMetadata keyspaceMetadataMock;
@Mock
private TableMetadata tableMetadataMock;
@BeforeEach
public void setUp() {
node = spy(new TbSaveToCustomCassandraTableNode());
config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration();
}
@AfterEach
public void tearDown() {
node.destroy();
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getTableName()).isEqualTo("");
assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", ""));
assertThat(config.getDefaultTtl()).isEqualTo(0);
}
@Test
public void givenCassandraClusterIsMissing_whenInit_thenThrowsException() {
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Unable to connect to Cassandra database")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(true);
}
@Test
public void givenTableDoesNotExist_whenInit_thenThrowsException() {
config.setTableName("test_table");
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
mockCassandraCluster();
given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty());
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(false);
}
@Test
public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() {
config.setTableName("test_table");
config.setFieldsMapping(emptyMap());
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
mockCassandraCluster();
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Fields(key,value) map is empty!");
}
@Test
public void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTableName("temperature_sensor");
config.setFieldsMapping(Map.of("temp", "temperature"));
mockOnInit();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid message structure, it is not a JSON Object: " + null);
}
@Test
public void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTableName("temperature_sensor");
config.setFieldsMapping(Map.of("temp", "temperature"));
mockOnInit();
mockBoundStatement();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
"humidity": 77
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(RuntimeException.class)
.hasMessage("Message data doesn't contain key: 'temp'!");
}
@Test
public void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTableName("temperature_sensor");
config.setFieldsMapping(Map.of("temp", "temperature"));
mockOnInit();
mockBoundStatement();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
"temp": [value]
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(RuntimeException.class)
.hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!");
}
@ParameterizedTest
@MethodSource
public void givenTtl_whenOnMsg_thenVerifyStatement(int ttlFromConfig,
String expectedQuery,
Consumer<BoundStatementBuilder> verifyBuilder) throws TbNodeException {
config.setTableName("readings");
config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn"));
config.setDefaultTtl(ttlFromConfig);
mockOnInit();
willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder();
given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.build()).willReturn(boundStatementMock);
mockSubmittingCassandraTask();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
then(sessionMock).should().prepare(expectedQuery);
verifyBuilder.accept(boundStatementBuilderMock);
}
private static Stream<Arguments> givenTtl_whenOnMsg_thenVerifyStatement() {
return Stream.of(
Arguments.of(0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)",
(Consumer<BoundStatementBuilder>) builder -> {
then(builder).should(never()).setInt(anyInt(), anyInt());
}),
Arguments.of(20, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?",
(Consumer<BoundStatementBuilder>) builder -> {
then(builder).should().setInt(1, 20);
})
);
}
@Test
public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException {
config.setDefaultTtl(25);
config.setTableName("readings");
Map<String, String> mappings = Map.of(
"$entityId", "entityIdTableColumn",
"doubleField", "doubleTableColumn",
"longField", "longTableColumn",
"booleanField", "booleanTableColumn",
"stringField", "stringTableColumn",
"jsonField", "jsonTableColumn"
);
config.setFieldsMapping(mappings);
mockOnInit();
mockBoundStatementBuilder();
mockSubmittingCassandraTask();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
"doubleField": 22.5,
"longField": 56,
"booleanField": true,
"stringField": "some string",
"jsonField": {
"key": "value"
}
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
node.onMsg(ctxMock, msg);
verifySettingStatementBuilder();
ArgumentCaptor<CassandraStatementTask> taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class);
then(ctxMock).should().submitCassandraWriteTask(taskCaptor.capture());
CassandraStatementTask task = taskCaptor.getValue();
assertThat(task.getTenantId()).isEqualTo(TENANT_ID);
assertThat(task.getSession()).isEqualTo(sessionMock);
assertThat(task.getStatement()).isEqualTo(boundStatementMock);
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> then(ctxMock).should().tellSuccess(msg)
);
}
@Override
protected TbNode getTestNode() {
return node;
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// config for version 1 with upgrade from version 0
Arguments.of(0,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}",
true,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}"
),
// default config for version 1 with upgrade from version 1
Arguments.of(1,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}",
false,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}"
)
);
}
private void mockOnInit() {
mockCassandraCluster();
given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE);
given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock);
}
private void mockCassandraCluster() {
given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock);
given(cassandraClusterMock.getSession()).willReturn(sessionMock);
given(sessionMock.getMetadata()).willReturn(metadataMock);
given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace");
given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock));
given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock));
}
private void mockBoundStatement() {
given(preparedStatementMock.bind()).willReturn(boundStatementMock);
given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock);
given(preparedStatementMock.getVariableDefinitions()).willReturn(columnDefinitionsMock);
given(boundStatementMock.codecRegistry()).willReturn(codecRegistryMock);
given(boundStatementMock.protocolVersion()).willReturn(protocolVersionMock);
given(boundStatementMock.getNode()).willReturn(nodeMock);
}
private void mockBoundStatementBuilder() {
willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder();
given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setDouble(anyInt(), anyDouble())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setLong(anyInt(), anyLong())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setBoolean(anyInt(), anyBoolean())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setString(anyInt(), anyString())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setInt(anyInt(), anyInt())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.build()).willReturn(boundStatementMock);
}
private void mockSubmittingCassandraTask() {
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
willAnswer(invocation -> {
SettableFuture<TbResultSet> mainFuture = SettableFuture.create();
mainFuture.set(new TbResultSet(null, null, null));
return new TbResultSetFuture(mainFuture);
}).given(ctxMock).submitCassandraWriteTask(any());
given(ctxMock.getDbCallbackExecutor()).willReturn(dbCallbackExecutor);
}
private void verifySettingStatementBuilder() {
Map<String, String> fieldsMap = (Map<String, String>) ReflectionTestUtils.getField(node, "fieldsMap");
List<String> values = new ArrayList<>(fieldsMap.values());
then(boundStatementBuilderMock).should().setUuid(values.indexOf("entityIdTableColumn"), DEVICE_ID.getId());
then(boundStatementBuilderMock).should().setDouble(values.indexOf("doubleTableColumn"), 22.5);
then(boundStatementBuilderMock).should().setLong(values.indexOf("longTableColumn"), 56L);
then(boundStatementBuilderMock).should().setBoolean(values.indexOf("booleanTableColumn"), true);
then(boundStatementBuilderMock).should().setString(values.indexOf("stringTableColumn"), "some string");
then(boundStatementBuilderMock).should().setString(values.indexOf("jsonTableColumn"), "{\"key\":\"value\"}");
then(boundStatementBuilderMock).should().setInt(values.size(), 25);
}
}