diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java index a586e4f6fa..1f5430a0bb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSubscriptionSettings.java @@ -30,13 +30,13 @@ import java.util.Map; public class TbPubSubSubscriptionSettings { @Value("${queue.pubsub.queue-properties.core:}") private String coreProperties; - @Value("${queue.pubsub.queue-properties.rule-engine}") + @Value("${queue.pubsub.queue-properties.rule-engine:}") private String ruleEngineProperties; - @Value("${queue.pubsub.queue-properties.transport-api}") + @Value("${queue.pubsub.queue-properties.transport-api:}") private String transportApiProperties; - @Value("${queue.pubsub.queue-properties.notifications}") + @Value("${queue.pubsub.queue-properties.notifications:}") private String notificationsProperties; - @Value("${queue.pubsub.queue-properties.js-executor}") + @Value("${queue.pubsub.queue-properties.js-executor:}") private String jsExecutorProperties; @Value("${queue.pubsub.queue-properties.version-control:}") private String vcProperties; diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java index 5ea04de462..7ccc37c0f8 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java @@ -69,6 +69,9 @@ public abstract class AbstractContainerTest { protected static final String WSS_URL = "wss://localhost"; protected static String TB_TOKEN; protected static RestClient restClient; + + protected static long timeoutMultiplier = 1; + protected ObjectMapper mapper = new ObjectMapper(); protected JsonParser jsonParser = new JsonParser(); @@ -76,6 +79,10 @@ public abstract class AbstractContainerTest { public static void before() throws Exception { restClient = new RestClient(HTTPS_URL); restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); + + if (!"kafka".equals(System.getProperty("blackBoxTests.queue", "kafka"))) { + timeoutMultiplier = 10; + } } @Rule diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java index ffd2e1f50a..47acbde16e 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ContainerTestSuite.java @@ -107,14 +107,26 @@ public class ContainerTestSuite { case "aws-sqs": replaceInFile(targetDir, "queue-aws-sqs.env", Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"), - "YOUR_SECRET", "blackBoxTests.awsSecret", - "YOUR_REGION", "blackBoxTests.awsRegion")); + "YOUR_SECRET", getSysProp("blackBoxTests.awsSecret"), + "YOUR_REGION", getSysProp("blackBoxTests.awsRegion"))); break; case "rabbitmq": composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml")); replaceInFile(targetDir, "queue-rabbitmq.env", Map.of("localhost", "rabbitmq")); break; + case "service-bus": + replaceInFile(targetDir, "queue-service-bus.env", + Map.of("YOUR_NAMESPACE_NAME", getSysProp("blackBoxTests.serviceBusNamespace"), + "YOUR_SAS_KEY_NAME", getSysProp("blackBoxTests.serviceBusSASPolicy"))); + replaceInFile(targetDir, "queue-service-bus.env", + Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey"))); + break; + case "pubsub": + replaceInFile(targetDir, "queue-pubsub.env", + Map.of("YOUR_PROJECT_ID", getSysProp("blackBoxTests.pubSubProjectId"), + "YOUR_SERVICE_ACCOUNT", getSysProp("blackBoxTests.pubSubServiceAccount"))); + break; default: throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java index 547ebaf575..d4ee31d9be 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/WsClient.java @@ -36,8 +36,11 @@ public class WsClient extends WebSocketClient { private CountDownLatch firstReply = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1); - WsClient(URI serverUri) { + private final long timeoutMultiplier; + + WsClient(URI serverUri, long timeoutMultiplier) { super(serverUri); + this.timeoutMultiplier = timeoutMultiplier; } @Override @@ -74,8 +77,13 @@ public class WsClient extends WebSocketClient { public WsTelemetryResponse getLastMessage() { try { - latch.await(10, TimeUnit.SECONDS); - return this.message; + boolean result = latch.await(10 * timeoutMultiplier, TimeUnit.SECONDS); + if (result) { + return this.message; + } else { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException("Timeout, ws message wasn't received"); + } } catch (InterruptedException e) { log.error("Timeout, ws message wasn't received"); } @@ -84,7 +92,11 @@ public class WsClient extends WebSocketClient { void waitForFirstReply() { try { - firstReply.await(10, TimeUnit.SECONDS); + boolean result = firstReply.await(10 * timeoutMultiplier, TimeUnit.SECONDS); + if (!result) { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException("Timeout, ws message wasn't received"); + } } catch (InterruptedException e) { log.error("Timeout, ws message wasn't received"); throw new RuntimeException(e); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java index a774888af5..f81d03b394 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/HttpClientTest.java @@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest { Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful()); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); @SuppressWarnings("deprecation") Optional allOptional = restClient.getAttributes(accessToken, null, null); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index 336d75bb45..19a53cb1ce 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -28,9 +28,6 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.junit.*; -import org.junit.rules.TestRule; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; @@ -181,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Request attributes JsonObject request = new JsonObject(); request.addProperty("clientKeys", "clientAttr"); request.addProperty("sharedKeys", "sharedAttr"); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); log.info("Received telemetry: {}", attributes); @@ -212,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); String sharedAttributeName = "sharedAttr"; @@ -226,7 +223,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -240,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest { device.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); @@ -258,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -277,7 +274,7 @@ public class MqttClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); @@ -287,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(5 * timeoutMultiplier, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); @@ -311,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest { // Create a new root rule chain RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Send the request to the server JsonObject clientRequest = new JsonObject(); clientRequest.addProperty("method", "getResponse"); @@ -320,8 +317,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(1); - MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(1 * timeoutMultiplier); + MqttEvent responseFromServer = listener.getEvents().poll(1 * timeoutMultiplier, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Assert.assertEquals(requestId, responseId); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); @@ -350,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest { MqttClient mqttClient = getMqttClient(deviceCredentials, listener); restClient.deleteDevice(device.getId()); - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); Assert.assertFalse(mqttClient.isConnected()); } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java index ec77cbec36..46aa4acd1d 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttGatewayClientTest.java @@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - var event = listener.getEvents().poll(10, TimeUnit.SECONDS); + var event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); JsonObject requestData = new JsonObject(); requestData.addProperty("id", 1); @@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("value")); @@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); Assert.assertTrue(responseData.has("values")); @@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent sharedAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent sharedAttributeEvent = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); // Catch attribute update event Assert.assertNotNull(sharedAttributeEvent); @@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); checkAttribute(true, clientAttributeValue); checkAttribute(false, sharedAttributeValue); @@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { public void subscribeToAttributeUpdatesFromServer() throws Exception { mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); String sharedAttributeName = "sharedAttr"; // Add a new shared attribute @@ -294,7 +294,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); - MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(sharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); @@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { createdDevice.getId()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); - event = listener.getEvents().poll(10, TimeUnit.SECONDS); + event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertEquals(updatedSharedAttributeValue, mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); } @@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); // Wait until subscription is processed - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); // Send an RPC from the server JsonObject serverRpcPayload = new JsonObject(); @@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { }); // Wait for RPC call from the server and send the response - MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); service.shutdownNow(); Assert.assertNotNull(requestFromServer); @@ -369,7 +369,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { // Send a response to the server's RPC request mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get(); - ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); + ResponseEntity serverResponse = future.get(5 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); } @@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest { gatewayAttributesRequest.addProperty("key", attributeName); log.info(gatewayAttributesRequest.toString()); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); - MqttEvent clientAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); + MqttEvent clientAttributeEvent = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); Assert.assertNotNull(clientAttributeEvent); JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); @@ -407,9 +407,17 @@ public class MqttGatewayClientTest extends AbstractContainerTest { } private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { + if (timeoutMultiplier > 1) { + TimeUnit.SECONDS.sleep(30); + } + String deviceName = "mqtt_device"; mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); + if (timeoutMultiplier > 1) { + TimeUnit.SECONDS.sleep(30); + } + List relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); Assert.assertEquals(1, relations.size());