added test for singleton mqtt node

This commit is contained in:
YevhenBondarenko 2023-12-13 23:13:05 +01:00
parent 1e2c7f9579
commit 65a8b0a34c
7 changed files with 378 additions and 0 deletions

View File

@ -107,6 +107,7 @@ public class ContainerTestSuite {
List<File> composeFiles = new ArrayList<>(Arrays.asList(
new File(targetDir + "docker-compose.yml"),
new File(targetDir + "docker-compose.volumes.yml"),
new File(targetDir + "docker-compose.mosquitto.yml"),
new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid.yml" : "docker-compose.postgres.yml")),
new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid-test-extras.yml" : "docker-compose.postgres-test-extras.yml")),
new File(targetDir + "docker-compose.postgres.volumes.yml"),
@ -162,6 +163,7 @@ public class ContainerTestSuite {
.withEnv(queueEnv)
.withEnv("LOAD_BALANCER_NAME", "")
.withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
.withExposedService("broker", 1883)
.waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
.waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
.waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))

View File

@ -48,4 +48,13 @@ public class TestProperties {
}
return System.getProperty("tb.wsUrl", "ws://localhost:8080");
}
public static String getMqttBrokerUrl() {
if (instance.isActive()) {
String host = instance.getTestContainer().getServiceHost("broker", 1883);
Integer port = instance.getTestContainer().getServicePort("broker", 1883);
return "tcp://" + host + ":" + port;
}
return System.getProperty("mqtt.broker", "tcp://localhost:1883");
}
}

View File

@ -31,10 +31,12 @@ import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
@ -45,9 +47,11 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
@ -527,4 +531,41 @@ public class TestRestClient {
.then()
.statusCode(HTTP_OK);
}
public PageData<EventInfo> getEvents(EntityId entityId, EventType eventType, TenantId tenantId, TimePageLink pageLink) {
Map<String, String> params = new HashMap<>();
params.put("entityType", entityId.getEntityType().name());
params.put("entityId", entityId.getId().toString());
params.put("eventType", eventType.name());
params.put("tenantId", tenantId.getId().toString());
addTimePageLinkToParam(params, pageLink);
return given().spec(requestSpec)
.get("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&" + getTimeUrlParams(pageLink), params)
.then()
.statusCode(HTTP_OK)
.extract()
.as(new TypeRef<>() {});
}
private void addTimePageLinkToParam(Map<String, String> params, TimePageLink pageLink) {
this.addPageLinkToParam(params, pageLink);
if (pageLink.getStartTime() != null) {
params.put("startTime", String.valueOf(pageLink.getStartTime()));
}
if (pageLink.getEndTime() != null) {
params.put("endTime", String.valueOf(pageLink.getEndTime()));
}
}
private String getTimeUrlParams(TimePageLink pageLink) {
String urlParams = getUrlParams(pageLink);
if (pageLink.getStartTime() != null) {
urlParams += "&startTime={startTime}";
}
if (pageLink.getEndTime() != null) {
urlParams += "&endTime={endTime}";
}
return urlParams;
}
}

View File

@ -0,0 +1,204 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.rule.node;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
import org.thingsboard.server.msa.DisableUIListeners;
import org.thingsboard.server.msa.TestProperties;
import org.thingsboard.server.msa.WsClient;
import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.fail;
import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevicePrototype;
@DisableUIListeners
@Slf4j
public class MqttNodeTest extends AbstractContainerTest {
private static final String TOPIC = "tb/mqtt/device";
private Device device;
@BeforeMethod
public void setUp() {
testRestClient.login("tenant@thingsboard.org", "tenant");
device = testRestClient.postDevice("", defaultDevicePrototype("mqtt_"));
}
@AfterMethod
public void tearDown() {
testRestClient.deleteDeviceIfExists(device.getId());
}
@Test
public void telemetryUpload() throws Exception {
RuleChainId defaultRuleChainId = getDefaultRuleChainId();
createRootRuleChainWithTestNode("MqttRuleNodeTestMetadata.json", "org.thingsboard.rule.engine.mqtt.TbMqttNode", 2);
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
MqttMessageListener messageListener = new MqttMessageListener();
MqttClient responseClient = new MqttClient(TestProperties.getMqttBrokerUrl(), StringUtils.randomAlphanumeric(10), new MemoryPersistence());
responseClient.connect();
responseClient.subscribe(TOPIC, messageListener);
MqttClient mqttClient = new MqttClient("tcp://localhost:1883", StringUtils.randomAlphanumeric(10), new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(deviceCredentials.getCredentialsId());
mqttClient.connect(mqttConnectOptions);
mqttClient.publish("v1/devices/me/telemetry", new MqttMessage(createPayload().toString().getBytes()));
WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage();
log.info("Received telemetry: {}", actualLatestTelemetry);
wsClient.closeBlocking();
assertThat(actualLatestTelemetry.getData()).hasSize(4);
assertThat(actualLatestTelemetry.getLatestValues().keySet()).containsOnlyOnceElementsOf(Arrays.asList("booleanKey", "stringKey", "doubleKey", "longKey"));
assertThat(actualLatestTelemetry.getDataValuesByKey("booleanKey").get(1)).isEqualTo(Boolean.TRUE.toString());
assertThat(actualLatestTelemetry.getDataValuesByKey("stringKey").get(1)).isEqualTo("value1");
assertThat(actualLatestTelemetry.getDataValuesByKey("doubleKey").get(1)).isEqualTo(Double.toString(42.0));
assertThat(actualLatestTelemetry.getDataValuesByKey("longKey").get(1)).isEqualTo(Long.toString(73));
Awaitility
.await()
.alias("Get integration events")
.atMost(10, TimeUnit.SECONDS)
.until(() -> messageListener.getEvents().size() > 0);
BlockingQueue<MqttEvent> events = messageListener.getEvents();
JsonNode actual = JacksonUtil.toJsonNode(Objects.requireNonNull(events.poll()).message);
assertThat(actual.get("stringKey").asText()).isEqualTo("value1");
assertThat(actual.get("booleanKey").asBoolean()).isEqualTo(Boolean.TRUE);
assertThat(actual.get("doubleKey").asDouble()).isEqualTo(42.0);
assertThat(actual.get("longKey").asLong()).isEqualTo(73);
testRestClient.setRootRuleChain(defaultRuleChainId);
}
@Data
private class MqttMessageListener implements IMqttMessageListener {
private final BlockingQueue<MqttEvent> events;
private MqttMessageListener() {
events = new ArrayBlockingQueue<>(100);
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
log.info("MQTT message [{}], topic [{}]", mqttMessage.toString(), s);
events.add(new MqttEvent(s, mqttMessage.toString()));
}
public BlockingQueue<MqttEvent> getEvents() {
return events;
}
}
@Data
private class MqttEvent {
private final String topic;
private final String message;
}
private RuleChainId getDefaultRuleChainId() {
PageData<RuleChain> ruleChains = testRestClient.getRuleChains(new PageLink(40, 0));
Optional<RuleChain> defaultRuleChain = ruleChains.getData()
.stream()
.filter(RuleChain::isRoot)
.findFirst();
if (!defaultRuleChain.isPresent()) {
fail("Root rule chain wasn't found");
}
return defaultRuleChain.get().getId();
}
protected RuleChainId createRootRuleChainWithTestNode(String ruleChainMetadataFile, String ruleNodeType, int eventsCount) throws Exception {
RuleChain newRuleChain = new RuleChain();
newRuleChain.setName("testRuleChain");
RuleChain ruleChain = testRestClient.postRuleChain(newRuleChain);
JsonNode configuration = JacksonUtil.OBJECT_MAPPER.readTree(this.getClass().getClassLoader().getResourceAsStream(ruleChainMetadataFile));
RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
ruleChainMetaData.setRuleChainId(ruleChain.getId());
ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
ruleChainMetaData.setNodes(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("nodes"), RuleNode[].class)));
ruleChainMetaData.setConnections(Arrays.asList(JacksonUtil.OBJECT_MAPPER.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
ruleChainMetaData = testRestClient.postRuleChainMetadata(ruleChainMetaData);
testRestClient.setRootRuleChain(ruleChain.getId());
RuleNode node = ruleChainMetaData.getNodes().stream().filter(ruleNode -> ruleNode.getType().equals(ruleNodeType)).findFirst().get();
Awaitility
.await()
.alias("Get events from rule chain")
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
PageData<EventInfo> events = testRestClient.getEvents(node.getId(), EventType.LC_EVENT, ruleChain.getTenantId(), new TimePageLink(1024));
List<EventInfo> eventInfos = events.getData().stream().filter(eventInfo ->
"STARTED".equals(eventInfo.getBody().get("event").asText()) &&
"true".equals(eventInfo.getBody().get("success").asText()))
.collect(Collectors.toList());
return eventInfos.size() == eventsCount;
});
return ruleChain.getId();
}
}

View File

@ -0,0 +1,79 @@
{
"firstNodeIndex": 2,
"nodes": [
{
"additionalInfo": {
"description": "",
"layoutX": 626,
"layoutY": 152
},
"type": "org.thingsboard.rule.engine.mqtt.TbMqttNode",
"name": "test mqtt",
"debugMode": true,
"singletonMode": true,
"queueName": "HighPriority",
"configurationVersion": 0,
"configuration": {
"topicPattern": "tb/mqtt/device",
"host": "broker",
"port": 1883,
"connectTimeoutSec": 10,
"clientId": null,
"cleanSession": true,
"retainedMessage": false,
"ssl": false,
"credentials": {
"type": "anonymous"
}
},
"externalId": null
},
{
"additionalInfo": {
"description": "",
"layoutX": 949,
"layoutY": 153
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "save timeseries",
"debugMode": true,
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"defaultTTL": 0,
"skipLatestPersistence": false,
"useServerTs": false
},
"externalId": null
},
{
"additionalInfo": {
"description": "",
"layoutX": 305,
"layoutY": 151
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "swatch",
"debugMode": false,
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"version": 0
},
"externalId": null
}
],
"connections": [
{
"fromIndex": 0,
"toIndex": 1,
"type": "Success"
},
{
"fromIndex": 2,
"toIndex": 0,
"type": "Post telemetry"
}
],
"ruleChainConnections": null
}

View File

@ -0,0 +1,25 @@
#
# Copyright © 2016-2023 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '3.0'
services:
broker:
image: eclipse-mosquitto
volumes:
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
ports:
- "1883"
restart: always

View File

@ -0,0 +1,18 @@
#
# Copyright © 2016-2023 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
listener 1883
allow_anonymous true