Merge pull request #6933 from volodymyr-babak/queue-black-box-tests

[WIP][3.4] Queue black box tests
This commit is contained in:
Andrew Shvayka 2022-07-13 16:28:39 +03:00 committed by GitHub
commit d7534073dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 75 additions and 39 deletions

View File

@ -30,13 +30,13 @@ import java.util.Map;
public class TbPubSubSubscriptionSettings { public class TbPubSubSubscriptionSettings {
@Value("${queue.pubsub.queue-properties.core:}") @Value("${queue.pubsub.queue-properties.core:}")
private String coreProperties; private String coreProperties;
@Value("${queue.pubsub.queue-properties.rule-engine}") @Value("${queue.pubsub.queue-properties.rule-engine:}")
private String ruleEngineProperties; private String ruleEngineProperties;
@Value("${queue.pubsub.queue-properties.transport-api}") @Value("${queue.pubsub.queue-properties.transport-api:}")
private String transportApiProperties; private String transportApiProperties;
@Value("${queue.pubsub.queue-properties.notifications}") @Value("${queue.pubsub.queue-properties.notifications:}")
private String notificationsProperties; private String notificationsProperties;
@Value("${queue.pubsub.queue-properties.js-executor}") @Value("${queue.pubsub.queue-properties.js-executor:}")
private String jsExecutorProperties; private String jsExecutorProperties;
@Value("${queue.pubsub.queue-properties.version-control:}") @Value("${queue.pubsub.queue-properties.version-control:}")
private String vcProperties; private String vcProperties;

View File

@ -69,6 +69,9 @@ public abstract class AbstractContainerTest {
protected static final String WSS_URL = "wss://localhost"; protected static final String WSS_URL = "wss://localhost";
protected static String TB_TOKEN; protected static String TB_TOKEN;
protected static RestClient restClient; protected static RestClient restClient;
protected static long timeoutMultiplier = 1;
protected ObjectMapper mapper = new ObjectMapper(); protected ObjectMapper mapper = new ObjectMapper();
protected JsonParser jsonParser = new JsonParser(); protected JsonParser jsonParser = new JsonParser();
@ -76,6 +79,10 @@ public abstract class AbstractContainerTest {
public static void before() throws Exception { public static void before() throws Exception {
restClient = new RestClient(HTTPS_URL); restClient = new RestClient(HTTPS_URL);
restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert()); restClient.getRestTemplate().setRequestFactory(getRequestFactoryForSelfSignedCert());
if (!"kafka".equals(System.getProperty("blackBoxTests.queue", "kafka"))) {
timeoutMultiplier = 10;
}
} }
@Rule @Rule

View File

@ -107,14 +107,26 @@ public class ContainerTestSuite {
case "aws-sqs": case "aws-sqs":
replaceInFile(targetDir, "queue-aws-sqs.env", replaceInFile(targetDir, "queue-aws-sqs.env",
Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"), Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"),
"YOUR_SECRET", "blackBoxTests.awsSecret", "YOUR_SECRET", getSysProp("blackBoxTests.awsSecret"),
"YOUR_REGION", "blackBoxTests.awsRegion")); "YOUR_REGION", getSysProp("blackBoxTests.awsRegion")));
break; break;
case "rabbitmq": case "rabbitmq":
composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml")); composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml"));
replaceInFile(targetDir, "queue-rabbitmq.env", replaceInFile(targetDir, "queue-rabbitmq.env",
Map.of("localhost", "rabbitmq")); Map.of("localhost", "rabbitmq"));
break; break;
case "service-bus":
replaceInFile(targetDir, "queue-service-bus.env",
Map.of("YOUR_NAMESPACE_NAME", getSysProp("blackBoxTests.serviceBusNamespace"),
"YOUR_SAS_KEY_NAME", getSysProp("blackBoxTests.serviceBusSASPolicy")));
replaceInFile(targetDir, "queue-service-bus.env",
Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey")));
break;
case "pubsub":
replaceInFile(targetDir, "queue-pubsub.env",
Map.of("YOUR_PROJECT_ID", getSysProp("blackBoxTests.pubSubProjectId"),
"YOUR_SERVICE_ACCOUNT", getSysProp("blackBoxTests.pubSubServiceAccount")));
break;
default: default:
throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE); throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE);
} }

View File

@ -36,8 +36,11 @@ public class WsClient extends WebSocketClient {
private CountDownLatch firstReply = new CountDownLatch(1); private CountDownLatch firstReply = new CountDownLatch(1);
private CountDownLatch latch = new CountDownLatch(1); private CountDownLatch latch = new CountDownLatch(1);
WsClient(URI serverUri) { private final long timeoutMultiplier;
WsClient(URI serverUri, long timeoutMultiplier) {
super(serverUri); super(serverUri);
this.timeoutMultiplier = timeoutMultiplier;
} }
@Override @Override
@ -74,8 +77,13 @@ public class WsClient extends WebSocketClient {
public WsTelemetryResponse getLastMessage() { public WsTelemetryResponse getLastMessage() {
try { try {
latch.await(10, TimeUnit.SECONDS); boolean result = latch.await(10 * timeoutMultiplier, TimeUnit.SECONDS);
return this.message; if (result) {
return this.message;
} else {
log.error("Timeout, ws message wasn't received");
throw new RuntimeException("Timeout, ws message wasn't received");
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Timeout, ws message wasn't received"); log.error("Timeout, ws message wasn't received");
} }
@ -84,7 +92,11 @@ public class WsClient extends WebSocketClient {
void waitForFirstReply() { void waitForFirstReply() {
try { try {
firstReply.await(10, TimeUnit.SECONDS); boolean result = firstReply.await(10 * timeoutMultiplier, TimeUnit.SECONDS);
if (!result) {
log.error("Timeout, ws message wasn't received");
throw new RuntimeException("Timeout, ws message wasn't received");
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Timeout, ws message wasn't received"); log.error("Timeout, ws message wasn't received");
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest {
Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful()); Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful());
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
Optional<JsonNode> allOptional = restClient.getAttributes(accessToken, null, null); Optional<JsonNode> allOptional = restClient.getAttributes(accessToken, null, null);

View File

@ -28,9 +28,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*; import org.junit.*;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@ -181,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed // Wait until subscription is processed
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
// Request attributes // Request attributes
JsonObject request = new JsonObject(); JsonObject request = new JsonObject();
request.addProperty("clientKeys", "clientAttr"); request.addProperty("clientKeys", "clientAttr");
request.addProperty("sharedKeys", "sharedAttr"); request.addProperty("sharedKeys", "sharedAttr");
mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get(); mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get();
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class); AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
log.info("Received telemetry: {}", attributes); log.info("Received telemetry: {}", attributes);
@ -212,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed // Wait until subscription is processed
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
String sharedAttributeName = "sharedAttr"; String sharedAttributeName = "sharedAttr";
@ -226,7 +223,7 @@ public class MqttClientTest extends AbstractContainerTest {
device.getId()); device.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertEquals(sharedAttributeValue, Assert.assertEquals(sharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
@ -240,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest {
device.getId()); device.getId());
Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
event = listener.getEvents().poll(10, TimeUnit.SECONDS); event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertEquals(updatedSharedAttributeValue, Assert.assertEquals(updatedSharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()); mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
@ -258,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed // Wait until subscription is processed
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
// Send an RPC from the server // Send an RPC from the server
JsonObject serverRpcPayload = new JsonObject(); JsonObject serverRpcPayload = new JsonObject();
@ -277,7 +274,7 @@ public class MqttClientTest extends AbstractContainerTest {
}); });
// Wait for RPC call from the server and send the response // Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage()); Assert.assertEquals("{\"method\":\"getValue\",\"params\":true}", Objects.requireNonNull(requestFromServer).getMessage());
@ -287,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest {
// Send a response to the server's RPC request // Send a response to the server's RPC request
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get(); mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); ResponseEntity serverResponse = future.get(5 * timeoutMultiplier, TimeUnit.SECONDS);
service.shutdownNow(); service.shutdownNow();
Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
@ -311,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest {
// Create a new root rule chain // Create a new root rule chain
RuleChainId ruleChainId = createRootRuleChainForRpcResponse(); RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
// Send the request to the server // Send the request to the server
JsonObject clientRequest = new JsonObject(); JsonObject clientRequest = new JsonObject();
clientRequest.addProperty("method", "getResponse"); clientRequest.addProperty("method", "getResponse");
@ -320,8 +317,8 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get();
// Check the response from the server // Check the response from the server
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1 * timeoutMultiplier);
MqttEvent responseFromServer = listener.getEvents().poll(1, TimeUnit.SECONDS); MqttEvent responseFromServer = listener.getEvents().poll(1 * timeoutMultiplier, TimeUnit.SECONDS);
Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length()));
Assert.assertEquals(requestId, responseId); Assert.assertEquals(requestId, responseId);
Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText()); Assert.assertEquals("requestReceived", mapper.readTree(responseFromServer.getMessage()).get("response").asText());
@ -350,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest {
MqttClient mqttClient = getMqttClient(deviceCredentials, listener); MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
restClient.deleteDevice(device.getId()); restClient.deleteDevice(device.getId());
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
Assert.assertFalse(mqttClient.isConnected()); Assert.assertFalse(mqttClient.isConnected());
} }

View File

@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
createdDevice.getId()); createdDevice.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
var event = listener.getEvents().poll(10, TimeUnit.SECONDS); var event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
JsonObject requestData = new JsonObject(); JsonObject requestData = new JsonObject();
requestData.addProperty("id", 1); requestData.addProperty("id", 1);
@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
event = listener.getEvents().poll(10, TimeUnit.SECONDS); event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
Assert.assertTrue(responseData.has("value")); Assert.assertTrue(responseData.has("value"));
@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
event = listener.getEvents().poll(10, TimeUnit.SECONDS); event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
Assert.assertTrue(responseData.has("values")); Assert.assertTrue(responseData.has("values"));
@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
event = listener.getEvents().poll(10, TimeUnit.SECONDS); event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject(); responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
Assert.assertTrue(responseData.has("values")); Assert.assertTrue(responseData.has("values"));
@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mapper.readTree(sharedAttributes.toString()), ResponseEntity.class, mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
createdDevice.getId()); createdDevice.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
MqttEvent sharedAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent sharedAttributeEvent = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
// Catch attribute update event // Catch attribute update event
Assert.assertNotNull(sharedAttributeEvent); Assert.assertNotNull(sharedAttributeEvent);
@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed // Wait until subscription is processed
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
checkAttribute(true, clientAttributeValue); checkAttribute(true, clientAttributeValue);
checkAttribute(false, sharedAttributeValue); checkAttribute(false, sharedAttributeValue);
@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
public void subscribeToAttributeUpdatesFromServer() throws Exception { public void subscribeToAttributeUpdatesFromServer() throws Exception {
mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed // Wait until subscription is processed
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
String sharedAttributeName = "sharedAttr"; String sharedAttributeName = "sharedAttr";
// Add a new shared attribute // Add a new shared attribute
@ -294,7 +294,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
createdDevice.getId()); createdDevice.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertEquals(sharedAttributeValue, Assert.assertEquals(sharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText());
@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
createdDevice.getId()); createdDevice.getId());
Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
event = listener.getEvents().poll(10, TimeUnit.SECONDS); event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertEquals(updatedSharedAttributeValue, Assert.assertEquals(updatedSharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText()); mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText());
} }
@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get(); mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed // Wait until subscription is processed
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
// Send an RPC from the server // Send an RPC from the server
JsonObject serverRpcPayload = new JsonObject(); JsonObject serverRpcPayload = new JsonObject();
@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
}); });
// Wait for RPC call from the server and send the response // Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
service.shutdownNow(); service.shutdownNow();
Assert.assertNotNull(requestFromServer); Assert.assertNotNull(requestFromServer);
@ -369,7 +369,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
// Send a response to the server's RPC request // Send a response to the server's RPC request
mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get(); mqttClient.publish(gatewayRpcTopic, Unpooled.wrappedBuffer(gatewayResponse.toString().getBytes())).get();
ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS); ResponseEntity serverResponse = future.get(5 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful()); Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
Assert.assertEquals(clientResponse.toString(), serverResponse.getBody()); Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
} }
@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
gatewayAttributesRequest.addProperty("key", attributeName); gatewayAttributesRequest.addProperty("key", attributeName);
log.info(gatewayAttributesRequest.toString()); log.info(gatewayAttributesRequest.toString());
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get(); mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get();
MqttEvent clientAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS); MqttEvent clientAttributeEvent = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
Assert.assertNotNull(clientAttributeEvent); Assert.assertNotNull(clientAttributeEvent);
JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject(); JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject();
@ -407,9 +407,17 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
} }
private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception { private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception {
if (timeoutMultiplier > 1) {
TimeUnit.SECONDS.sleep(30);
}
String deviceName = "mqtt_device"; String deviceName = "mqtt_device";
mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get(); mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get();
if (timeoutMultiplier > 1) {
TimeUnit.SECONDS.sleep(30);
}
List<EntityRelation> relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON); List<EntityRelation> relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON);
Assert.assertEquals(1, relations.size()); Assert.assertEquals(1, relations.size());