From 2cccbc7dca182c0456f50e634b61cef1ec93f9ff Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Wed, 10 Oct 2018 20:00:33 +0300 Subject: [PATCH] MQTT tests: fix CountDown latch --- .../AbstractMqttServerSideRpcIntegrationTest.java | 13 ++++++++----- .../AbstractMqttTelemetryIntegrationTest.java | 10 ++++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 74ecdb2a76..7453682644 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -103,10 +103,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC options.setUserName(accessToken); client.connect(options).waitForCompletion(); - TestMqttCallback callback = new TestMqttCallback(client); - client.setCallback(callback); CountDownLatch latch = new CountDownLatch(1); - latch.countDown(); + TestMqttCallback callback = new TestMqttCallback(client, latch); + client.setCallback(callback); + client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value()); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; @@ -163,7 +163,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC options.setUserName(accessToken); client.connect(options).waitForCompletion(); client.subscribe("v1/devices/me/rpc/request/+", 1); - client.setCallback(new TestMqttCallback(client)); + client.setCallback(new TestMqttCallback(client, new CountDownLatch(1))); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); @@ -211,10 +211,12 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC private static class TestMqttCallback implements MqttCallback { private final MqttAsyncClient client; + private final CountDownLatch latch; private Integer qoS; - TestMqttCallback(MqttAsyncClient client) { + TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) { this.client = client; + this.latch = latch; } int getQoS() { @@ -233,6 +235,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8")); qoS = mqttMessage.getQos(); client.publish(responseTopic, message); + latch.countDown(); } @Override diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java index df9601e735..f7a7ceafd6 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java @@ -105,10 +105,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(accessToken); client.connect(options).waitForCompletion(3000); - TestMqttCallback callback = new TestMqttCallback(client); - client.setCallback(callback); CountDownLatch latch = new CountDownLatch(1); - latch.countDown(); + TestMqttCallback callback = new TestMqttCallback(client, latch); + client.setCallback(callback); client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value()); String payload = "{\"key\":\"value\"}"; String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); @@ -120,6 +119,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr private static class TestMqttCallback implements MqttCallback { private final MqttAsyncClient client; + private final CountDownLatch latch; private Integer qoS; private String payload; @@ -127,8 +127,9 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr return payload; } - TestMqttCallback(MqttAsyncClient client) { + TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) { this.client = client; + this.latch = latch; } int getQoS() { @@ -143,6 +144,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr public void messageArrived(String requestTopic, MqttMessage mqttMessage) { payload = new String(mqttMessage.getPayload()); qoS = mqttMessage.getQos(); + latch.countDown(); } @Override