From 65a8b0a34c4972793f1c41f32761853d640fd871 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 13 Dec 2023 23:13:05 +0100 Subject: [PATCH] added test for singleton mqtt node --- .../server/msa/ContainerTestSuite.java | 2 + .../server/msa/TestProperties.java | 9 + .../server/msa/TestRestClient.java | 41 ++++ .../server/msa/rule/node/MqttNodeTest.java | 204 ++++++++++++++++++ .../resources/MqttRuleNodeTestMetadata.json | 79 +++++++ .../resources/docker-compose.mosquitto.yml | 25 +++ .../test/resources/mosquitto/mosquitto.conf | 18 ++ 7 files changed, 378 insertions(+) create mode 100644 msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java create mode 100644 msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json create mode 100644 msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml create mode 100644 msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index dd3948c495..142b42e5b2 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -107,6 +107,7 @@ public class ContainerTestSuite { List composeFiles = new ArrayList<>(Arrays.asList( new File(targetDir + "docker-compose.yml"), new File(targetDir + "docker-compose.volumes.yml"), + new File(targetDir + "docker-compose.mosquitto.yml"), new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid.yml" : "docker-compose.postgres.yml")), new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid-test-extras.yml" : "docker-compose.postgres-test-extras.yml")), new File(targetDir + "docker-compose.postgres.volumes.yml"), @@ -162,6 +163,7 @@ public class ContainerTestSuite { .withEnv(queueEnv) .withEnv("LOAD_BALANCER_NAME", "") .withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) + .withExposedService("broker", 1883) .waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) .waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) .waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT)) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java index a70d1022ce..26f9da3618 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestProperties.java @@ -48,4 +48,13 @@ public class TestProperties { } return System.getProperty("tb.wsUrl", "ws://localhost:8080"); } + + public static String getMqttBrokerUrl() { + if (instance.isActive()) { + String host = instance.getTestContainer().getServiceHost("broker", 1883); + Integer port = instance.getTestContainer().getServicePort("broker", 1883); + return "tcp://" + host + ":" + port; + } + return System.getProperty("mqtt.broker", "tcp://localhost:1883"); + } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java index f04331e01e..a675ef4a20 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/TestRestClient.java @@ -31,10 +31,12 @@ import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; @@ -45,9 +47,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; @@ -527,4 +531,41 @@ public class TestRestClient { .then() .statusCode(HTTP_OK); } + + public PageData getEvents(EntityId entityId, EventType eventType, TenantId tenantId, TimePageLink pageLink) { + Map params = new HashMap<>(); + params.put("entityType", entityId.getEntityType().name()); + params.put("entityId", entityId.getId().toString()); + params.put("eventType", eventType.name()); + params.put("tenantId", tenantId.getId().toString()); + addTimePageLinkToParam(params, pageLink); + + return given().spec(requestSpec) + .get("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&" + getTimeUrlParams(pageLink), params) + .then() + .statusCode(HTTP_OK) + .extract() + .as(new TypeRef<>() {}); + } + + private void addTimePageLinkToParam(Map params, TimePageLink pageLink) { + this.addPageLinkToParam(params, pageLink); + if (pageLink.getStartTime() != null) { + params.put("startTime", String.valueOf(pageLink.getStartTime())); + } + if (pageLink.getEndTime() != null) { + params.put("endTime", String.valueOf(pageLink.getEndTime())); + } + } + + private String getTimeUrlParams(TimePageLink pageLink) { + String urlParams = getUrlParams(pageLink); + if (pageLink.getStartTime() != null) { + urlParams += "&startTime={startTime}"; + } + if (pageLink.getEndTime() != null) { + urlParams += "&endTime={endTime}"; + } + return urlParams; + } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java new file mode 100644 index 0000000000..5aa0ae48fb --- /dev/null +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/rule/node/MqttNodeTest.java @@ -0,0 +1,204 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.msa.rule.node; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.eclipse.paho.client.mqttv3.IMqttMessageListener; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EventInfo; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.event.EventType; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.rule.NodeConnectionInfo; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.msa.AbstractContainerTest; +import org.thingsboard.server.msa.DisableUIListeners; +import org.thingsboard.server.msa.TestProperties; +import org.thingsboard.server.msa.WsClient; +import org.thingsboard.server.msa.mapper.WsTelemetryResponse; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.fail; +import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevicePrototype; + +@DisableUIListeners +@Slf4j +public class MqttNodeTest extends AbstractContainerTest { + + private static final String TOPIC = "tb/mqtt/device"; + + private Device device; + + @BeforeMethod + public void setUp() { + testRestClient.login("tenant@thingsboard.org", "tenant"); + device = testRestClient.postDevice("", defaultDevicePrototype("mqtt_")); + } + + @AfterMethod + public void tearDown() { + testRestClient.deleteDeviceIfExists(device.getId()); + } + + @Test + public void telemetryUpload() throws Exception { + RuleChainId defaultRuleChainId = getDefaultRuleChainId(); + + createRootRuleChainWithTestNode("MqttRuleNodeTestMetadata.json", "org.thingsboard.rule.engine.mqtt.TbMqttNode", 2); + + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS); + + MqttMessageListener messageListener = new MqttMessageListener(); + MqttClient responseClient = new MqttClient(TestProperties.getMqttBrokerUrl(), StringUtils.randomAlphanumeric(10), new MemoryPersistence()); + responseClient.connect(); + responseClient.subscribe(TOPIC, messageListener); + + MqttClient mqttClient = new MqttClient("tcp://localhost:1883", StringUtils.randomAlphanumeric(10), new MemoryPersistence()); + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(deviceCredentials.getCredentialsId()); + mqttClient.connect(mqttConnectOptions); + mqttClient.publish("v1/devices/me/telemetry", new MqttMessage(createPayload().toString().getBytes())); + + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(); + log.info("Received telemetry: {}", actualLatestTelemetry); + wsClient.closeBlocking(); + + assertThat(actualLatestTelemetry.getData()).hasSize(4); + assertThat(actualLatestTelemetry.getLatestValues().keySet()).containsOnlyOnceElementsOf(Arrays.asList("booleanKey", "stringKey", "doubleKey", "longKey")); + + assertThat(actualLatestTelemetry.getDataValuesByKey("booleanKey").get(1)).isEqualTo(Boolean.TRUE.toString()); + assertThat(actualLatestTelemetry.getDataValuesByKey("stringKey").get(1)).isEqualTo("value1"); + assertThat(actualLatestTelemetry.getDataValuesByKey("doubleKey").get(1)).isEqualTo(Double.toString(42.0)); + assertThat(actualLatestTelemetry.getDataValuesByKey("longKey").get(1)).isEqualTo(Long.toString(73)); + + Awaitility + .await() + .alias("Get integration events") + .atMost(10, TimeUnit.SECONDS) + .until(() -> messageListener.getEvents().size() > 0); + + BlockingQueue events = messageListener.getEvents(); + JsonNode actual = JacksonUtil.toJsonNode(Objects.requireNonNull(events.poll()).message); + + assertThat(actual.get("stringKey").asText()).isEqualTo("value1"); + assertThat(actual.get("booleanKey").asBoolean()).isEqualTo(Boolean.TRUE); + assertThat(actual.get("doubleKey").asDouble()).isEqualTo(42.0); + assertThat(actual.get("longKey").asLong()).isEqualTo(73); + + testRestClient.setRootRuleChain(defaultRuleChainId); + } + + @Data + private class MqttMessageListener implements IMqttMessageListener { + private final BlockingQueue events; + + private MqttMessageListener() { + events = new ArrayBlockingQueue<>(100); + } + + @Override + public void messageArrived(String s, MqttMessage mqttMessage) { + log.info("MQTT message [{}], topic [{}]", mqttMessage.toString(), s); + events.add(new MqttEvent(s, mqttMessage.toString())); + } + + public BlockingQueue getEvents() { + return events; + } + } + + @Data + private class MqttEvent { + private final String topic; + private final String message; + } + + private RuleChainId getDefaultRuleChainId() { + PageData ruleChains = testRestClient.getRuleChains(new PageLink(40, 0)); + + Optional defaultRuleChain = ruleChains.getData() + .stream() + .filter(RuleChain::isRoot) + .findFirst(); + if (!defaultRuleChain.isPresent()) { + fail("Root rule chain wasn't found"); + } + return defaultRuleChain.get().getId(); + } + + protected RuleChainId createRootRuleChainWithTestNode(String ruleChainMetadataFile, String ruleNodeType, int eventsCount) throws Exception { + RuleChain newRuleChain = new RuleChain(); + newRuleChain.setName("testRuleChain"); + RuleChain ruleChain = testRestClient.postRuleChain(newRuleChain); + + JsonNode configuration = JacksonUtil.OBJECT_MAPPER.readTree(this.getClass().getClassLoader().getResourceAsStream(ruleChainMetadataFile)); + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(ruleChain.getId()); + ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt()); + ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("nodes"), RuleNode[].class))); + ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class))); + + ruleChainMetaData = testRestClient.postRuleChainMetadata(ruleChainMetaData); + + testRestClient.setRootRuleChain(ruleChain.getId()); + + RuleNode node = ruleChainMetaData.getNodes().stream().filter(ruleNode -> ruleNode.getType().equals(ruleNodeType)).findFirst().get(); + + Awaitility + .await() + .alias("Get events from rule chain") + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + PageData events = testRestClient.getEvents(node.getId(), EventType.LC_EVENT, ruleChain.getTenantId(), new TimePageLink(1024)); + List eventInfos = events.getData().stream().filter(eventInfo -> + "STARTED".equals(eventInfo.getBody().get("event").asText()) && + "true".equals(eventInfo.getBody().get("success").asText())) + .collect(Collectors.toList()); + + return eventInfos.size() == eventsCount; + }); + + return ruleChain.getId(); + } +} diff --git a/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json new file mode 100644 index 0000000000..e6c9f5a53f --- /dev/null +++ b/msa/black-box-tests/src/test/resources/MqttRuleNodeTestMetadata.json @@ -0,0 +1,79 @@ +{ + "firstNodeIndex": 2, + "nodes": [ + { + "additionalInfo": { + "description": "", + "layoutX": 626, + "layoutY": 152 + }, + "type": "org.thingsboard.rule.engine.mqtt.TbMqttNode", + "name": "test mqtt", + "debugMode": true, + "singletonMode": true, + "queueName": "HighPriority", + "configurationVersion": 0, + "configuration": { + "topicPattern": "tb/mqtt/device", + "host": "broker", + "port": 1883, + "connectTimeoutSec": 10, + "clientId": null, + "cleanSession": true, + "retainedMessage": false, + "ssl": false, + "credentials": { + "type": "anonymous" + } + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 949, + "layoutY": 153 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "save timeseries", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 0, + "skipLatestPersistence": false, + "useServerTs": false + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 305, + "layoutY": 151 + }, + "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", + "name": "swatch", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "version": 0 + }, + "externalId": null + } + ], + "connections": [ + { + "fromIndex": 0, + "toIndex": 1, + "type": "Success" + }, + { + "fromIndex": 2, + "toIndex": 0, + "type": "Post telemetry" + } + ], + "ruleChainConnections": null +} \ No newline at end of file diff --git a/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml new file mode 100644 index 0000000000..10675c9a23 --- /dev/null +++ b/msa/black-box-tests/src/test/resources/docker-compose.mosquitto.yml @@ -0,0 +1,25 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3.0' +services: + broker: + image: eclipse-mosquitto + volumes: + - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf + ports: + - "1883" + restart: always diff --git a/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf new file mode 100644 index 0000000000..7a40d338fc --- /dev/null +++ b/msa/black-box-tests/src/test/resources/mosquitto/mosquitto.conf @@ -0,0 +1,18 @@ +# +# Copyright © 2016-2023 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +listener 1883 +allow_anonymous true