From c838c7ba4ee4b75ebd254672923eecd69929f250 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 12 Jul 2022 21:54:13 +0300 Subject: [PATCH] Added slowQueue parameter to tune timeouts in runtime --- .../server/msa/AbstractContainerTest.java | 6 ++++ .../org/thingsboard/server/msa/WsClient.java | 2 +- .../msa/connectivity/HttpClientTest.java | 2 +- .../msa/connectivity/MqttClientTest.java | 24 ++++++------- .../connectivity/MqttGatewayClientTest.java | 34 +++++++++++-------- 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java index 5ea04de462..ffdcb71c9e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java @@ -69,6 +69,9 @@ public abstract class AbstractContainerTest { protected static final String WSS_URL = "wss://localhost"; protected static String TB_TOKEN; protected static RestClient restClient; + + protected static boolean slowQueue; + protected ObjectMapper mapper = new ObjectMapper(); protected JsonParser jsonParser = new JsonParser(); @@ -76,6 +79,9 @@ public abstract class AbstractContainerTest { public static void before() throws Exception { restClient = new RestClient(HTTPS_URL); restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); + + String QUEUE_TYPE = System.getProperty("blackBoxTests.queue", "kafka"); + slowQueue = !"kafka".equals(QUEUE_TYPE); } @Rule diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 0f75775808..4da9fe2b24 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -89,7 +89,7 @@ public class WsClient extends WebSocketClient { void waitForFirstReply() { try { - boolean result = firstReply.await(10, TimeUnit.SECONDS); + boolean result = firstReply.await(120, TimeUnit.SECONDS); if (!result) { log.error("Timeout, ws message wasn't received"); throw new RuntimeException("Timeout, ws message wasn't received"); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java index c721b08f5e..bd7a9d2e38 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java @@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest { Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful()); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); @SuppressWarnings("deprecation") Optional allOptional = restClient.getAttributes(accessToken, null, null); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 9d6ef02c0c..ffdbcec0e0 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -178,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Request attributes JsonObject request = new JsonObject(); request.addProperty("clientKeys", "clientAttr"); request.addProperty("sharedKeys", "sharedAttr"); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); - MqttEvent event = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); log.info("Received telemetry: {}", attributes); @@ -209,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); String sharedAttributeName = "sharedAttr"; @@ -223,7 +223,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -237,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -255,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -274,7 +274,7 @@ public class MqttClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); @@ -284,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(15, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(slowQueue ? 30 : 5, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); @@ -308,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest { // Create a new root rule chain RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Send the request to the server JsonObject clientRequest = new JsonObject(); clientRequest.addProperty("method", "getResponse"); @@ -317,8 +317,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(10); - MqttEvent responseFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(slowQueue ? 10 : 1); + MqttEvent responseFromServer = listener.getEvents().poll(slowQueue ? 10 : 1, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Assert.assertEquals(requestId, responseId); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); @@ -347,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClient mqttClient = getMqttClient(deviceCredentials, listener); restClient.deleteDevice(device.getId()); - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); Assert.assertFalse(mqttClient.isConnected()); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index 6ead803b76..032b4a3b45 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - var event = listener.getEvents().poll(60, TimeUnit.SECONDS); + var event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); JsonObject requestData = new JsonObject(); requestData.addProperty("id", 1); @@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("value")); @@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent sharedAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent sharedAttributeEvent = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); // Catch attribute update event Assert.assertNotNull(sharedAttributeEvent); @@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); checkAttribute(true, clientAttributeValue); checkAttribute(false, sharedAttributeValue); @@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { public void subscribeToAttributeUpdatesFromServer() throws Exception { mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); String sharedAttributeName = "sharedAttr"; // Add a new shared attribute @@ -294,7 +294,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); @@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(60, TimeUnit.SECONDS); + event = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); } @@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(30); + TimeUnit.SECONDS.sleep(slowQueue ? 30 : 3); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertNotNull(requestFromServer); @@ -369,7 +369,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(slowQueue ? 30 : 5, TimeUnit.SECONDS); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); } @@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { gatewayAttributesRequest.addProperty("key", attributeName); log.info(gatewayAttributesRequest.toString()); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); - MqttEvent clientAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS); + MqttEvent clientAttributeEvent = listener.getEvents().poll(slowQueue ? 60 : 10, TimeUnit.SECONDS); Assert.assertNotNull(clientAttributeEvent); JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); @@ -407,12 +407,16 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { - TimeUnit.SECONDS.sleep(30); + if (slowQueue) { + TimeUnit.SECONDS.sleep(30); + } String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); - TimeUnit.SECONDS.sleep(30); + if (slowQueue) { + TimeUnit.SECONDS.sleep(30); + } List relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON);