Cover MQTT API with black box tests

This commit is contained in:
Viacheslav Kukhtyn 2018-10-23 16:41:43 +03:00
parent 67b8c2f4bf
commit fb098e1bae
10 changed files with 483 additions and 56 deletions

View File

@ -15,4 +15,4 @@ As result, in REPOSITORY column, next images should be present:
- Run the integration tests in the [msa/integration-tests](../integration-tests) directory:
mvn clean install -Dintegrationtests.skip=false
mvn clean install -DintegrationTests.skip=false

View File

@ -34,9 +34,10 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
<integrationtests.skip>true</integrationtests.skip>
<integrationTests.skip>true</integrationTests.skip>
<testcontainers.version>1.9.1</testcontainers.version>
<java-websocket.version>1.3.9</java-websocket.version>
<httpclient.version>4.5.6</httpclient.version>
</properties>
<dependencies>
@ -50,6 +51,11 @@
<artifactId>Java-WebSocket</artifactId>
<version>${java-websocket.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>io.takari.junit</groupId>
<artifactId>takari-cpsuite</artifactId>
@ -89,7 +95,7 @@
<includes>
<include>**/*TestSuite.java</include>
</includes>
<skipTests>${integrationtests.skip}</skipTests>
<skipTests>${integrationTests.skip}</skipTests>
</configuration>
</plugin>
</plugins>

View File

@ -21,55 +21,68 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.conn.ssl.X509HostnameVerifier;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.junit.*;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.thingsboard.client.tools.RestClient;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import javax.net.ssl.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Slf4j
public abstract class AbstractContainerTest {
protected static String httpUrl;
protected static String wsUrl;
protected static final String HTTPS_URL = "https://localhost";
protected static final String WSS_URL = "wss://localhost";
protected static RestClient restClient;
protected ObjectMapper mapper = new ObjectMapper();
@BeforeClass
public static void before() {
httpUrl = "http://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT);
wsUrl = "ws://localhost:" + ContainerTestSuite.composeContainer.getServicePort("tb-web-ui1", ContainerTestSuite.EXPOSED_PORT);
restClient = new RestClient(httpUrl);
public static void before() throws Exception {
restClient = new RestClient(HTTPS_URL);
restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
}
protected Device createDevice(String name) {
return restClient.createDevice(name + RandomStringUtils.randomAlphanumeric(7), "DEFAULT");
}
protected WsClient subscribeToTelemetryWebSocket(DeviceId deviceId) throws URISyntaxException, InterruptedException {
WsClient mWs = new WsClient(new URI(wsUrl + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
mWs.connectBlocking(1, TimeUnit.SECONDS);
protected WsClient subscribeToWebSocket(DeviceId deviceId, String scope, CmdsType property) throws Exception {
WsClient wsClient = new WsClient(new URI(WSS_URL + "/api/ws/plugins/telemetry?token=" + restClient.getToken()));
SSLContextBuilder builder = SSLContexts.custom();
builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
wsClient.setSocket(builder.build().getSocketFactory().createSocket());
wsClient.connectBlocking();
JsonObject tsSubCmd = new JsonObject();
tsSubCmd.addProperty("entityType", EntityType.DEVICE.name());
tsSubCmd.addProperty("entityId", deviceId.toString());
tsSubCmd.addProperty("scope", "LATEST_TELEMETRY");
tsSubCmd.addProperty("cmdId", new Random().nextInt(100));
tsSubCmd.addProperty("unsubscribe", false);
JsonArray wsTsSubCmds = new JsonArray();
wsTsSubCmds.add(tsSubCmd);
JsonObject cmdsObject = new JsonObject();
cmdsObject.addProperty("entityType", EntityType.DEVICE.name());
cmdsObject.addProperty("entityId", deviceId.toString());
cmdsObject.addProperty("scope", scope);
cmdsObject.addProperty("cmdId", new Random().nextInt(100));
JsonArray cmd = new JsonArray();
cmd.add(cmdsObject);
JsonObject wsRequest = new JsonObject();
wsRequest.add("tsSubCmds", wsTsSubCmds);
wsRequest.add("historyCmds", new JsonArray());
wsRequest.add("attrSubCmds", new JsonArray());
mWs.send(wsRequest.toString());
return mWs;
wsRequest.add(property.toString(), cmd);
wsClient.send(wsRequest.toString());
return wsClient;
}
protected Map<String, Long> getExpectedLatestValues(long ts) {
@ -109,4 +122,54 @@ public abstract class AbstractContainerTest {
return values;
}
protected enum CmdsType {
TS_SUB_CMDS("tsSubCmds"),
HISTORY_CMDS("historyCmds"),
ATTR_SUB_CMDS("attrSubCmds");
private final String text;
CmdsType(final String text) {
this.text = text;
}
@Override
public String toString() {
return text;
}
}
private static HttpComponentsClientHttpRequestFactory getRequestFactoryForSelfSignedCert() throws Exception {
SSLContextBuilder builder = SSLContexts.custom();
builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true);
SSLContext sslContext = builder.build();
SSLConnectionSocketFactory sslSelfSigned = new SSLConnectionSocketFactory(sslContext, new X509HostnameVerifier() {
@Override
public void verify(String host, SSLSocket ssl) {
}
@Override
public void verify(String host, X509Certificate cert) {
}
@Override
public void verify(String host, String[] cns, String[] subjectAlts) {
}
@Override
public boolean verify(String s, SSLSession sslSession) {
return true;
}
});
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder
.<ConnectionSocketFactory>create()
.register("https", sslSelfSigned)
.build();
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(cm).build();
return new HttpComponentsClientHttpRequestFactory(httpClient);
}
}

View File

@ -26,12 +26,11 @@ import java.io.File;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.msa.*"})
public class ContainerTestSuite {
static final int EXPOSED_PORT = 8080;
@ClassRule
public static DockerComposeContainer composeContainer = new DockerComposeContainer(new File("./../docker/docker-compose.yml"))
.withPull(false)
.withLocalCompose(true)
.withTailChildContainers(true)
.withExposedService("tb-web-ui1", EXPOSED_PORT, Wait.forHttp("/login"));
.withExposedService("tb-web-ui1", 8080, Wait.forHttp("/login"));
}

View File

@ -19,16 +19,12 @@ import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class WsClient extends WebSocketClient {
private final BlockingQueue<String> events;
private String message;
public WsClient(URI serverUri) {
super(serverUri);
events = new ArrayBlockingQueue<>(100);
}
@Override
@ -37,13 +33,11 @@ public class WsClient extends WebSocketClient {
@Override
public void onMessage(String message) {
events.add(message);
this.message = message;
}
@Override
public void onClose(int code, String reason, boolean remote) {
events.clear();
}
@Override
@ -54,4 +48,4 @@ public class WsClient extends WebSocketClient {
public String getLastMessage() {
return this.message;
}
}
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.msa.connectivity;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.http.ResponseEntity;
@ -22,7 +23,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
import org.thingsboard.server.msa.WsClient;
import org.thingsboard.server.msa.WsTelemetryResponse;
import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import java.util.concurrent.TimeUnit;
@ -35,23 +36,24 @@ public class HttpClientTest extends AbstractContainerTest {
Device device = createDevice("http_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
ResponseEntity deviceTelemetryResponse = restClient.getRestTemplate()
.postForEntity(httpUrl + "/api/v1/{credentialsId}/telemetry",
.postForEntity(HTTPS_URL + "/api/v1/{credentialsId}/telemetry",
mapper.readTree(createPayload().toString()),
ResponseEntity.class,
deviceCredentials.getCredentialsId());
Assert.assertTrue(deviceTelemetryResponse.getStatusCode().is2xxSuccessful());
TimeUnit.SECONDS.sleep(1);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet());
Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
actualLatestTelemetry.getLatestValues().keySet());
Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
}

View File

@ -15,20 +15,39 @@
*/
package org.thingsboard.server.msa.connectivity;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Data;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttHandler;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.AbstractContainerTest;
import org.thingsboard.server.msa.WsClient;
import org.thingsboard.server.msa.WsTelemetryResponse;
import org.thingsboard.server.msa.mapper.AttributesResponse;
import org.thingsboard.server.msa.mapper.WsTelemetryResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
public class MqttClientTest extends AbstractContainerTest {
@ -39,20 +58,22 @@ public class MqttClientTest extends AbstractContainerTest {
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
MqttClient mqttClient = getMqttClient(deviceCredentials);
WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload().toString().getBytes()));
TimeUnit.SECONDS.sleep(1);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
Assert.assertEquals(getExpectedLatestValues(123456789L).keySet(), actualLatestTelemetry.getLatestValues().keySet());
Assert.assertEquals(4, actualLatestTelemetry.getData().size());
Assert.assertEquals(Sets.newHashSet("booleanKey", "stringKey", "doubleKey", "longKey"),
actualLatestTelemetry.getLatestValues().keySet());
Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", Boolean.TRUE.toString()));
Assert.assertTrue(verify(actualLatestTelemetry, "stringKey", "value1"));
Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "longKey", Long.toString(73)));
restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
@Test
@ -63,12 +84,13 @@ public class MqttClientTest extends AbstractContainerTest {
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
WsClient mWs = subscribeToTelemetryWebSocket(device.getId());
MqttClient mqttClient = getMqttClient(deviceCredentials);
WsClient wsClient = subscribeToWebSocket(device.getId(), "LATEST_TELEMETRY", CmdsType.TS_SUB_CMDS);
MqttClient mqttClient = getMqttClient(deviceCredentials, null);
mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer(createPayload(ts).toString().getBytes()));
TimeUnit.SECONDS.sleep(1);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(mWs.getLastMessage(), WsTelemetryResponse.class);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
Assert.assertEquals(4, actualLatestTelemetry.getData().size());
Assert.assertEquals(getExpectedLatestValues(ts), actualLatestTelemetry.getLatestValues());
Assert.assertTrue(verify(actualLatestTelemetry, "booleanKey", ts, Boolean.TRUE.toString()));
@ -76,15 +98,271 @@ public class MqttClientTest extends AbstractContainerTest {
Assert.assertTrue(verify(actualLatestTelemetry, "doubleKey", ts, Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "longKey", ts, Long.toString(73)));
restClient.getRestTemplate().delete(httpUrl + "/api/device/" + device.getId());
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials) throws InterruptedException {
MqttMessageListener queue = new MqttMessageListener();
@Test
public void publishAttributeUpdateToServer() throws Exception {
restClient.login("tenant@thingsboard.org", "tenant");
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
WsClient wsClient = subscribeToWebSocket(device.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
JsonObject clientAttributes = new JsonObject();
clientAttributes.addProperty("attr1", "value1");
clientAttributes.addProperty("attr2", true);
clientAttributes.addProperty("attr3", 42.0);
clientAttributes.addProperty("attr4", 73);
mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
TimeUnit.SECONDS.sleep(1);
WsTelemetryResponse actualLatestTelemetry = mapper.readValue(wsClient.getLastMessage(), WsTelemetryResponse.class);
Assert.assertEquals(4, actualLatestTelemetry.getData().size());
Assert.assertEquals(Sets.newHashSet("attr1", "attr2", "attr3", "attr4"),
actualLatestTelemetry.getLatestValues().keySet());
Assert.assertTrue(verify(actualLatestTelemetry, "attr1", "value1"));
Assert.assertTrue(verify(actualLatestTelemetry, "attr2", Boolean.TRUE.toString()));
Assert.assertTrue(verify(actualLatestTelemetry, "attr3", Double.toString(42.0)));
Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73)));
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
@Test
public void requestAttributeValuesFromServer() throws Exception {
restClient.login("tenant@thingsboard.org", "tenant");
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
// Add a new client attribute
JsonObject clientAttributes = new JsonObject();
String clientAttributeValue = RandomStringUtils.randomAlphanumeric(8);
clientAttributes.addProperty("clientAttr", clientAttributeValue);
mqttClient.publish("v1/devices/me/attributes", Unpooled.wrappedBuffer(clientAttributes.toString().getBytes()));
// Add a new shared attribute
JsonObject sharedAttributes = new JsonObject();
String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
sharedAttributes.addProperty("sharedAttr", sharedAttributeValue);
ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
device.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
// Subscribe to attributes response
mqttClient.on("v1/devices/me/attributes/response/+", listener);
// Request attributes
JsonObject request = new JsonObject();
request.addProperty("clientKeys", "clientAttr");
request.addProperty("sharedKeys", "sharedAttr");
mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes()));
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
Assert.assertEquals(1, attributes.getClient().size());
Assert.assertEquals(clientAttributeValue, attributes.getClient().get("clientAttr"));
Assert.assertEquals(1, attributes.getShared().size());
Assert.assertEquals(sharedAttributeValue, attributes.getShared().get("sharedAttr"));
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
@Test
public void subscribeToAttributeUpdatesFromServer() throws Exception {
restClient.login("tenant@thingsboard.org", "tenant");
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
mqttClient.on("v1/devices/me/attributes", listener);
String sharedAttributeName = "sharedAttr";
// Add a new shared attribute
JsonObject sharedAttributes = new JsonObject();
String sharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue);
ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
device.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
Assert.assertEquals(sharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
// Update the shared attribute value
JsonObject updatedSharedAttributes = new JsonObject();
String updatedSharedAttributeValue = RandomStringUtils.randomAlphanumeric(8);
updatedSharedAttributes.addProperty(sharedAttributeName, updatedSharedAttributeValue);
ResponseEntity updatedSharedAttributesResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
mapper.readTree(updatedSharedAttributes.toString()), ResponseEntity.class,
device.getId());
Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
event = listener.getEvents().poll(10, TimeUnit.SECONDS);
Assert.assertEquals(updatedSharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
@Test
public void serverSideRpc() throws Exception {
restClient.login("tenant@thingsboard.org", "tenant");
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
mqttClient.on("v1/devices/me/rpc/request/+", listener);
// Send an RPC from the server
JsonObject serverRpcPayload = new JsonObject();
serverRpcPayload.addProperty("method", "getValue");
serverRpcPayload.addProperty("params", true);
serverRpcPayload.addProperty("timeout", 1000);
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<ResponseEntity> future = service.submit(() -> {
try {
return restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/plugins/rpc/twoway/{deviceId}",
mapper.readTree(serverRpcPayload.toString()), String.class,
device.getId());
} catch (IOException e) {
return ResponseEntity.badRequest().build();
}
});
// Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage());
Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length()));
JsonObject clientResponse = new JsonObject();
clientResponse.addProperty("response", "someResponse");
// Send a response to the server's RPC request
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes()));
ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
@Test
public void clientSideRpc() throws Exception {
restClient.login("tenant@thingsboard.org", "tenant");
Device device = createDevice("mqtt_");
DeviceCredentials deviceCredentials = restClient.getCredentials(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
mqttClient.on("v1/devices/me/rpc/request/+", listener);
// Get the default rule chain id to make it root again after test finished
RuleChainId defaultRuleChainId = getDefaultRuleChainId();
// Create a new root rule chain
RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
// Send the request to the server
JsonObject clientRequest = new JsonObject();
clientRequest.addProperty("method", "getResponse");
clientRequest.addProperty("params", true);
Integer requestId = 42;
mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes()));
// Check the response from the server
TimeUnit.SECONDS.sleep(1);
MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS);
Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
Assert.assertEquals(requestId, responseId);
Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText());
// Make the default rule chain a root again
ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
null,
RuleChain.class,
defaultRuleChainId);
Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
// Delete the created rule chain
restClient.getRestTemplate().delete(HTTPS_URL + "/api/ruleChain/{ruleChainId}", ruleChainId);
restClient.getRestTemplate().delete(HTTPS_URL + "/api/device/" + device.getId());
}
private RuleChainId createRootRuleChainForRpcResponse() throws Exception {
RuleChain newRuleChain = new RuleChain();
newRuleChain.setName("testRuleChain");
ResponseEntity<RuleChain> ruleChainResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/ruleChain",
newRuleChain,
RuleChain.class);
Assert.assertTrue(ruleChainResponse.getStatusCode().is2xxSuccessful());
RuleChain ruleChain = ruleChainResponse.getBody();
JsonNode configuration = mapper.readTree(this.getClass().getClassLoader().getResourceAsStream("RpcResponseRuleChainMetadata.json"));
RuleChainMetaData ruleChainMetaData = new RuleChainMetaData();
ruleChainMetaData.setRuleChainId(ruleChain.getId());
ruleChainMetaData.setFirstNodeIndex(configuration.get("firstNodeIndex").asInt());
ruleChainMetaData.setNodes(Arrays.asList(mapper.treeToValue(configuration.get("nodes"), RuleNode[].class)));
ruleChainMetaData.setConnections(Arrays.asList(mapper.treeToValue(configuration.get("connections"), NodeConnectionInfo[].class)));
ResponseEntity<RuleChainMetaData> ruleChainMetadataResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/ruleChain/metadata",
ruleChainMetaData,
RuleChainMetaData.class);
Assert.assertTrue(ruleChainMetadataResponse.getStatusCode().is2xxSuccessful());
// Set a new rule chain as root
ResponseEntity<RuleChain> rootRuleChainResponse = restClient.getRestTemplate()
.postForEntity(HTTPS_URL + "/api/ruleChain/{ruleChainId}/root",
null,
RuleChain.class,
ruleChain.getId());
Assert.assertTrue(rootRuleChainResponse.getStatusCode().is2xxSuccessful());
return ruleChain.getId();
}
private RuleChainId getDefaultRuleChainId() {
ResponseEntity<TextPageData<RuleChain>> ruleChains = restClient.getRestTemplate().exchange(
HTTPS_URL + "/api/ruleChains?limit=40&textSearch=",
HttpMethod.GET,
null,
new ParameterizedTypeReference<TextPageData<RuleChain>>() {
});
Optional<RuleChain> defaultRuleChain = ruleChains.getBody().getData()
.stream()
.filter(RuleChain::isRoot)
.findFirst();
if (!defaultRuleChain.isPresent()) {
Assert.fail("Root rule chain wasn't found");
}
return defaultRuleChain.get().getId();
}
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException {
MqttClientConfig clientConfig = new MqttClientConfig();
clientConfig.setClientId("MQTT client from test");
clientConfig.setUsername(deviceCredentials.getCredentialsId());
MqttClient mqttClient = MqttClient.create(clientConfig, queue);
MqttClient mqttClient = MqttClient.create(clientConfig, listener);
mqttClient.connect("localhost", 1883).sync();
return mqttClient;
}

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.mapper;
import lombok.Data;
import java.util.Map;
@Data
public class AttributesResponse {
private Map<String, Object> client;
private Map<String, Object> shared;
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa;
package org.thingsboard.server.msa.mapper;
import lombok.Data;

View File

@ -0,0 +1,59 @@
{
"firstNodeIndex": 0,
"nodes": [
{
"additionalInfo": {
"layoutX": 325,
"layoutY": 150
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "msgTypeSwitch",
"debugMode": true,
"configuration": {
"version": 0
}
},
{
"additionalInfo": {
"layoutX": 60,
"layoutY": 300
},
"type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode",
"name": "formResponse",
"debugMode": true,
"configuration": {
"jsScript": "if (msg.method == \"getResponse\") {\n return {msg: {\"response\": \"requestReceived\"}, metadata: metadata, msgType: msgType};\n}\n\nreturn {msg: msg, metadata: metadata, msgType: msgType};"
}
},
{
"additionalInfo": {
"layoutX": 450,
"layoutY": 300
},
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCReplyNode",
"name": "rpcReply",
"debugMode": true,
"configuration": {
"requestIdMetaDataAttribute": "requestId"
}
}
],
"connections": [
{
"fromIndex": 0,
"toIndex": 1,
"type": "RPC Request from Device"
},
{
"fromIndex": 1,
"toIndex": 2,
"type": "Success"
},
{
"fromIndex": 1,
"toIndex": 2,
"type": "Failure"
}
],
"ruleChainConnections": null
}