From 69e2eef64ddbd0ee7206e4ba7f647fa0ef7902c2 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 11 Jul 2022 17:46:43 +0300 Subject: [PATCH 1/5] Fix TbPubSubSubscriptionSettings - added default values. Fixed WsClient - throw exception properly on latch timeout. Tuned timeouts for tests --- .../pubsub/TbPubSubSubscriptionSettings.java | 8 ++++---- docker/.env | 2 +- .../server/msa/ContainerTestSuite.java | 11 +++++++++-- .../java/org/thingsboard/server/msa/WsClient.java | 15 ++++++++++++--- .../server/msa/connectivity/MqttClientTest.java | 6 +++--- .../msa/connectivity/MqttGatewayClientTest.java | 2 ++ msa/pom.xml | 1 + 7 files changed, 32 insertions(+), 13 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java index a586e4f6fa..1f5430a0bb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java @@ -30,13 +30,13 @@ import java.util.Map; public class TbPubSubSubscriptionSettings { @Value("${queue.pubsub.queue-properties.core:}") private String coreProperties; - @Value("${queue.pubsub.queue-properties.rule-engine}") + @Value("${queue.pubsub.queue-properties.rule-engine:}") private String ruleEngineProperties; - @Value("${queue.pubsub.queue-properties.transport-api}") + @Value("${queue.pubsub.queue-properties.transport-api:}") private String transportApiProperties; - @Value("${queue.pubsub.queue-properties.notifications}") + @Value("${queue.pubsub.queue-properties.notifications:}") private String notificationsProperties; - @Value("${queue.pubsub.queue-properties.js-executor}") + @Value("${queue.pubsub.queue-properties.js-executor:}") private String jsExecutorProperties; @Value("${queue.pubsub.queue-properties.version-control:}") private String vcProperties; diff --git a/docker/.env b/docker/.env index f33ed50da5..27653a19ad 100644 --- a/docker/.env +++ b/docker/.env @@ -1,4 +1,4 @@ -TB_QUEUE_TYPE=kafka +TB_QUEUE_TYPE=pubsub # redis or redis-cluster CACHE=redis diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index ffd2e1f50a..f61ae550d0 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -107,14 +107,21 @@ public class ContainerTestSuite { case "aws-sqs": replaceInFile(targetDir, "queue-aws-sqs.env", Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"), - "YOUR_SECRET", "blackBoxTests.awsSecret", - "YOUR_REGION", "blackBoxTests.awsRegion")); + "YOUR_SECRET", getSysProp("blackBoxTests.awsSecret"), + "YOUR_REGION", getSysProp("blackBoxTests.awsRegion"))); break; case "rabbitmq": composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml")); replaceInFile(targetDir, "queue-rabbitmq.env", Map.of("localhost", "rabbitmq")); break; + case "service-bus": + replaceInFile(targetDir, "queue-service-bus.env", + Map.of("YOUR_NAMESPACE_NAME", getSysProp("blackBoxTests.serviceBusNamespace"), + "YOUR_SAS_KEY_NAME", getSysProp("blackBoxTests.serviceBusSASPolicy"))); + replaceInFile(targetDir, "queue-service-bus.env", + Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey"))); + break; default: throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 547ebaf575..0f75775808 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -74,8 +74,13 @@ public class WsClient extends WebSocketClient { public WsTelemetryResponse getLastMessage() { try { - latch.await(10, TimeUnit.SECONDS); - return this.message; + boolean result = latch.await(120, TimeUnit.SECONDS); + if (result) { + return this.message; + } else { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException("Timeout, ws message wasn't received"); + } } catch (InterruptedException e) { log.error("Timeout, ws message wasn't received"); } @@ -84,7 +89,11 @@ public class WsClient extends WebSocketClient { void waitForFirstReply() { try { - firstReply.await(10, TimeUnit.SECONDS); + boolean result = firstReply.await(10, TimeUnit.SECONDS); + if (!result) { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException("Timeout, ws message wasn't received"); + } } catch (InterruptedException e) { log.error("Timeout, ws message wasn't received"); throw new RuntimeException(e); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 336d75bb45..97e1836d06 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -277,7 +277,7 @@ public class MqttClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); @@ -320,8 +320,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(1); - MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(10); + MqttEvent responseFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Assert.assertEquals(requestId, responseId); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index ec77cbec36..d66db480ef 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -410,6 +410,8 @@ public class MqttGatewayClientTest extends AbstractContainerTest { String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); + TimeUnit.SECONDS.sleep(60); + List relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); Assert.assertEquals(1, relations.size()); diff --git a/msa/pom.xml b/msa/pom.xml index dd40348bc8..d32fceca68 100644 --- a/msa/pom.xml +++ b/msa/pom.xml @@ -46,6 +46,7 @@ web-ui tb-node transport + black-box-tests From bf5f80c925d62667c1fda6a7c6f32196c36fae79 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 12 Jul 2022 21:39:13 +0300 Subject: [PATCH 2/5] Added pubsub queue support in bbt. Increased timeouts for slow queues --- .../server/msa/ContainerTestSuite.java | 5 ++++ .../msa/connectivity/HttpClientTest.java | 2 +- .../msa/connectivity/MqttClientTest.java | 19 ++++++-------- .../connectivity/MqttGatewayClientTest.java | 26 ++++++++++--------- 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index f61ae550d0..47acbde16e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -122,6 +122,11 @@ public class ContainerTestSuite { replaceInFile(targetDir, "queue-service-bus.env", Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey"))); break; + case "pubsub": + replaceInFile(targetDir, "queue-pubsub.env", + Map.of("YOUR_PROJECT_ID", getSysProp("blackBoxTests.pubSubProjectId"), + "YOUR_SERVICE_ACCOUNT", getSysProp("blackBoxTests.pubSubServiceAccount"))); + break; default: throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java index a774888af5..c721b08f5e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java @@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest { Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful()); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); @SuppressWarnings("deprecation") Optional allOptional = restClient.getAttributes(accessToken, null, null); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 97e1836d06..9d6ef02c0c 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,9 +28,6 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.junit.*; -import org.junit.rules.TestRule; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; @@ -181,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); // Request attributes JsonObject request = new JsonObject(); request.addProperty("clientKeys", "clientAttr"); request.addProperty("sharedKeys", "sharedAttr"); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(60, TimeUnit.SECONDS); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); log.info("Received telemetry: {}", attributes); @@ -212,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); String sharedAttributeName = "sharedAttr"; @@ -240,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(60, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -258,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -287,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(15, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); @@ -311,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest { // Create a new root rule chain RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); // Send the request to the server JsonObject clientRequest = new JsonObject(); clientRequest.addProperty("method", "getResponse"); @@ -350,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClient mqttClient = getMqttClient(deviceCredentials, listener); restClient.deleteDevice(device.getId()); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); Assert.assertFalse(mqttClient.isConnected()); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index d66db480ef..6ead803b76 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - var event = listener.getEvents().poll(10, TimeUnit.SECONDS); + var event = listener.getEvents().poll(60, TimeUnit.SECONDS); JsonObject requestData = new JsonObject(); requestData.addProperty("id", 1); @@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(60, TimeUnit.SECONDS); JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("value")); @@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(60, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(60, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent sharedAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent sharedAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS); // Catch attribute update event Assert.assertNotNull(sharedAttributeEvent); @@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); checkAttribute(true, clientAttributeValue); checkAttribute(false, sharedAttributeValue); @@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { public void subscribeToAttributeUpdatesFromServer() throws Exception { mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); String sharedAttributeName = "sharedAttr"; // Add a new shared attribute @@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(60, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); } @@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(30); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertNotNull(requestFromServer); @@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { gatewayAttributesRequest.addProperty("key", attributeName); log.info(gatewayAttributesRequest.toString()); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); - MqttEvent clientAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent clientAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS); Assert.assertNotNull(clientAttributeEvent); JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); @@ -407,10 +407,12 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { + TimeUnit.SECONDS.sleep(30); + String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); - TimeUnit.SECONDS.sleep(60); + TimeUnit.SECONDS.sleep(30); List relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); From 8341002d5b2008a16ed016292bbc28bf88c7a085 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 12 Jul 2022 21:42:35 +0300 Subject: [PATCH 3/5] Revert default queue type --- docker/.env | 2 +- msa/pom.xml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docker/.env b/docker/.env index 27653a19ad..f33ed50da5 100644 --- a/docker/.env +++ b/docker/.env @@ -1,4 +1,4 @@ -TB_QUEUE_TYPE=pubsub +TB_QUEUE_TYPE=kafka # redis or redis-cluster CACHE=redis diff --git a/msa/pom.xml b/msa/pom.xml index d32fceca68..dd40348bc8 100644 --- a/msa/pom.xml +++ b/msa/pom.xml @@ -46,7 +46,6 @@ web-ui tb-node transport - black-box-tests From c838c7ba4ee4b75ebd254672923eecd69929f250 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 12 Jul 2022 21:54:13 +0300 Subject: [PATCH 4/5] Added slowQueue parameter to tune timeouts in runtime --- .../server/msa/AbstractContainerTest.java | 6 ++++ .../org/thingsboard/server/msa/WsClient.java | 2 +- .../msa/connectivity/HttpClientTest.java | 2 +- .../msa/connectivity/MqttClientTest.java | 24 ++++++------- .../connectivity/MqttGatewayClientTest.java | 34 +++++++++++-------- 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java index 5ea04de462..ffdcb71c9e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java @@ -69,6 +69,9 @@ public abstract class AbstractContainerTest { protected static final String WSS_URL = "wss://localhost"; protected static String TB_TOKEN; protected static RestClient restClient; + + protected static boolean slowQueue; + protected ObjectMapper mapper = new ObjectMapper(); protected JsonParser jsonParser = new JsonParser(); @@ -76,6 +79,9 @@ public abstract class AbstractContainerTest { public static void before() throws Exception { restClient = new RestClient(HTTPS_URL); restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); + + String QUEUE_TYPE = System.getProperty("blackBoxTests.queue", "kafka"); + slowQueue = !"kafka".equals(QUEUE_TYPE); } @Rule diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 0f75775808..4da9fe2b24 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -89,7 +89,7 @@ public class WsClient extends WebSocketClient { void waitForFirstReply() { try { - boolean result = firstReply.await(10, TimeUnit.SECONDS); + boolean result = firstReply.await(120, TimeUnit.SECONDS); if (!result) { log.error("Timeout, ws message wasn't received"); throw new RuntimeException("Timeout, ws message wasn't received"); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java index c721b08f5e..bd7a9d2e38 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java @@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest { Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful()); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); @SuppressWarnings("deprecation") Optional allOptional = restClient.getAttributes(accessToken, null, null); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 9d6ef02c0c..ffdbcec0e0 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -178,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Request attributes JsonObject request = new JsonObject(); request.addProperty("clientKeys", "clientAttr"); request.addProperty("sharedKeys", "sharedAttr"); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); - MqttEvent event = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); log.info("Received telemetry: {}", attributes); @@ -209,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); String sharedAttributeName = "sharedAttr"; @@ -223,7 +223,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -237,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -255,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -274,7 +274,7 @@ public class MqttClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); @@ -284,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(15, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(slowQueue ? 30 : 5, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); @@ -308,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest { // Create a new root rule chain RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Send the request to the server JsonObject clientRequest = new JsonObject(); clientRequest.addProperty("method", "getResponse"); @@ -317,8 +317,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(10); - MqttEvent responseFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(slowQueue ? 10 : 1); + MqttEvent responseFromServer = listener.getEvents().poll(slowQueue ? 10 : 1, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Assert.assertEquals(requestId, responseId); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); @@ -347,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClient mqttClient = getMqttClient(deviceCredentials, listener); restClient.deleteDevice(device.getId()); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); Assert.assertFalse(mqttClient.isConnected()); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 6ead803b76..032b4a3b45 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - var event = listener.getEvents().poll(60, TimeUnit.SECONDS); + var event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); JsonObject requestData = new JsonObject(); requestData.addProperty("id", 1); @@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("value")); @@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent sharedAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent sharedAttributeEvent = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); // Catch attribute update event Assert.assertNotNull(sharedAttributeEvent); @@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); checkAttribute(true, clientAttributeValue); checkAttribute(false, sharedAttributeValue); @@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { public void subscribeToAttributeUpdatesFromServer() throws Exception { mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); String sharedAttributeName = "sharedAttr"; // Add a new shared attribute @@ -294,7 +294,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); @@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); } @@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertNotNull(requestFromServer); @@ -369,7 +369,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(slowQueue ? 30 : 5, TimeUnit.SECONDS); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); } @@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { gatewayAttributesRequest.addProperty("key", attributeName); log.info(gatewayAttributesRequest.toString()); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); - MqttEvent clientAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent clientAttributeEvent = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertNotNull(clientAttributeEvent); JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); @@ -407,12 +407,16 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { - TimeUnit.SECONDS.sleep(30); + if (slowQueue) { + TimeUnit.SECONDS.sleep(30); + } String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); - TimeUnit.SECONDS.sleep(30); + if (slowQueue) { + TimeUnit.SECONDS.sleep(30); + } List relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); From 58313cd66430170765b6a7ab7375b6c1389aedcc Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 13 Jul 2022 14:49:59 +0300 Subject: [PATCH 5/5] Code review changes --- .../server/msa/AbstractContainerTest.java | 7 +++-- .../org/thingsboard/server/msa/WsClient.java | 9 ++++-- .../msa/connectivity/HttpClientTest.java | 2 +- .../msa/connectivity/MqttClientTest.java | 24 +++++++-------- .../connectivity/MqttGatewayClientTest.java | 30 +++++++++---------- 5 files changed, 38 insertions(+), 34 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java index ffdcb71c9e..7ccc37c0f8 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java @@ -70,7 +70,7 @@ public abstract class AbstractContainerTest { protected static String TB_TOKEN; protected static RestClient restClient; - protected static boolean slowQueue; + protected static long timeoutMultiplier = 1; protected ObjectMapper mapper = new ObjectMapper(); protected JsonParser jsonParser = new JsonParser(); @@ -80,8 +80,9 @@ public abstract class AbstractContainerTest { restClient = new RestClient(HTTPS_URL); restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); - String QUEUE_TYPE = System.getProperty("blackBoxTests.queue", "kafka"); - slowQueue = !"kafka".equals(QUEUE_TYPE); + if (!"kafka".equals(System.getProperty("blackBoxTests.queue", "kafka"))) { + timeoutMultiplier = 10; + } } @Rule diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 4da9fe2b24..d4ee31d9be 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -36,8 +36,11 @@ public class WsClient extends WebSocketClient { private CountDownLatch firstReply = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1); - WsClient(URI serverUri) { + private final long timeoutMultiplier; + + WsClient(URI serverUri, long timeoutMultiplier) { super(serverUri); + this.timeoutMultiplier = timeoutMultiplier; } @Override @@ -74,7 +77,7 @@ public class WsClient extends WebSocketClient { public WsTelemetryResponse getLastMessage() { try { - boolean result = latch.await(120, TimeUnit.SECONDS); + boolean result = latch.await(10 * timeoutMultiplier, TimeUnit.SECONDS); if (result) { return this.message; } else { @@ -89,7 +92,7 @@ public class WsClient extends WebSocketClient { void waitForFirstReply() { try { - boolean result = firstReply.await(120, TimeUnit.SECONDS); + boolean result = firstReply.await(10 * timeoutMultiplier, TimeUnit.SECONDS); if (!result) { log.error("Timeout, ws message wasn't received"); throw new RuntimeException("Timeout, ws message wasn't received"); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java index bd7a9d2e38..f81d03b394 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java @@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest { Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful()); - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); @SuppressWarnings("deprecation") Optional allOptional = restClient.getAttributes(accessToken, null, null); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index ffdbcec0e0..19a53cb1ce 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -178,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Request attributes JsonObject request = new JsonObject(); request.addProperty("clientKeys", "clientAttr"); request.addProperty("sharedKeys", "sharedAttr"); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); - MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); log.info("Received telemetry: {}", attributes); @@ -209,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); String sharedAttributeName = "sharedAttr"; @@ -223,7 +223,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -237,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -255,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -274,7 +274,7 @@ public class MqttClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); @@ -284,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(slowQueue ? 30 : 5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(5 * timeoutMultiplier, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); @@ -308,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest { // Create a new root rule chain RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Send the request to the server JsonObject clientRequest = new JsonObject(); clientRequest.addProperty("method", "getResponse"); @@ -317,8 +317,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(slowQueue ? 10 : 1); - MqttEvent responseFromServer = listener.getEvents().poll(slowQueue ? 10 : 1, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(1 * timeoutMultiplier); + MqttEvent responseFromServer = listener.getEvents().poll(1 * timeoutMultiplier, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Assert.assertEquals(requestId, responseId); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); @@ -347,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClient mqttClient = getMqttClient(deviceCredentials, listener); restClient.deleteDevice(device.getId()); - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); Assert.assertFalse(mqttClient.isConnected()); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 032b4a3b45..46aa4acd1d 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - var event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + var event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); JsonObject requestData = new JsonObject(); requestData.addProperty("id", 1); @@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("value")); @@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent sharedAttributeEvent = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent sharedAttributeEvent = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); // Catch attribute update event Assert.assertNotNull(sharedAttributeEvent); @@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); checkAttribute(true, clientAttributeValue); checkAttribute(false, sharedAttributeValue); @@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { public void subscribeToAttributeUpdatesFromServer() throws Exception { mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); String sharedAttributeName = "sharedAttr"; // Add a new shared attribute @@ -294,7 +294,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); @@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); } @@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertNotNull(requestFromServer); @@ -369,7 +369,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(slowQueue ? 30 : 5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(5 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); } @@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { gatewayAttributesRequest.addProperty("key", attributeName); log.info(gatewayAttributesRequest.toString()); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); - MqttEvent clientAttributeEvent = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); + MqttEvent clientAttributeEvent = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertNotNull(clientAttributeEvent); JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); @@ -407,14 +407,14 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { - if (slowQueue) { + if (timeoutMultiplier > 1) { TimeUnit.SECONDS.sleep(30); } String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); - if (slowQueue) { + if (timeoutMultiplier > 1) { TimeUnit.SECONDS.sleep(30); }