Added pubsub queue support in bbt. Increased timeouts for slow queues

This commit is contained in:
Volodymyr Babak 2022-07-12 21:39:13 +03:00
parent 69e2eef64d
commit bf5f80c925
4 changed files with 28 additions and 24 deletions

View File

@ -122,6 +122,11 @@ public class ContainerTestSuite {
replaceInFile(targetDir, "queue-service-bus.env",
Map.of("YOUR_SAS_KEY", getSysProp("blackBoxTests.serviceBusPrimaryKey")));
break;
case "pubsub":
replaceInFile(targetDir, "queue-pubsub.env",
Map.of("YOUR_PROJECT_ID", getSysProp("blackBoxTests.pubSubProjectId"),
"YOUR_SERVICE_ACCOUNT", getSysProp("blackBoxTests.pubSubServiceAccount")));
break;
default:
throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE);
}

View File

@ -90,7 +90,7 @@ public class HttpClientTest extends AbstractContainerTest {
Assert.assertTrue(deviceClientsAttributes.getStatusCode().is2xxSuccessful());
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
@SuppressWarnings("deprecation")
Optional<JsonNode> allOptional = restClient.getAttributes(accessToken, null, null);

View File

@ -28,9 +28,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.*;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
@ -181,14 +178,14 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
// Request attributes
JsonObject request = new JsonObject();
request.addProperty("clientKeys", "clientAttr");
request.addProperty("sharedKeys", "sharedAttr");
mqttClient.publish("v1/devices/me/attributes/request/" + new Random().nextInt(100), Unpooled.wrappedBuffer(request.toString().getBytes())).get();
MqttEvent event = listener.getEvents().poll(10, TimeUnit.SECONDS);
MqttEvent event = listener.getEvents().poll(60, TimeUnit.SECONDS);
AttributesResponse attributes = mapper.readValue(Objects.requireNonNull(event).getMessage(), AttributesResponse.class);
log.info("Received telemetry: {}", attributes);
@ -212,7 +209,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
String sharedAttributeName = "sharedAttr";
@ -240,7 +237,7 @@ public class MqttClientTest extends AbstractContainerTest {
device.getId());
Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
event = listener.getEvents().poll(10, TimeUnit.SECONDS);
event = listener.getEvents().poll(60, TimeUnit.SECONDS);
Assert.assertEquals(updatedSharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText());
@ -258,7 +255,7 @@ public class MqttClientTest extends AbstractContainerTest {
mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
// Send an RPC from the server
JsonObject serverRpcPayload = new JsonObject();
@ -287,7 +284,7 @@ public class MqttClientTest extends AbstractContainerTest {
// Send a response to the server's RPC request
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
ResponseEntity serverResponse = future.get(5, TimeUnit.SECONDS);
ResponseEntity serverResponse = future.get(15, TimeUnit.SECONDS);
service.shutdownNow();
Assert.assertTrue(serverResponse.getStatusCode().is2xxSuccessful());
Assert.assertEquals(clientResponse.toString(), serverResponse.getBody());
@ -311,7 +308,7 @@ public class MqttClientTest extends AbstractContainerTest {
// Create a new root rule chain
RuleChainId ruleChainId = createRootRuleChainForRpcResponse();
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
// Send the request to the server
JsonObject clientRequest = new JsonObject();
clientRequest.addProperty("method", "getResponse");
@ -350,7 +347,7 @@ public class MqttClientTest extends AbstractContainerTest {
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
restClient.deleteDevice(device.getId());
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
Assert.assertFalse(mqttClient.isConnected());
}

View File

@ -168,7 +168,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
createdDevice.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
var event = listener.getEvents().poll(10, TimeUnit.SECONDS);
var event = listener.getEvents().poll(60, TimeUnit.SECONDS);
JsonObject requestData = new JsonObject();
requestData.addProperty("id", 1);
@ -178,7 +178,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
event = listener.getEvents().poll(10, TimeUnit.SECONDS);
event = listener.getEvents().poll(60, TimeUnit.SECONDS);
JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
Assert.assertTrue(responseData.has("value"));
@ -195,7 +195,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
event = listener.getEvents().poll(10, TimeUnit.SECONDS);
event = listener.getEvents().poll(60, TimeUnit.SECONDS);
responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
Assert.assertTrue(responseData.has("values"));
@ -213,7 +213,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
event = listener.getEvents().poll(10, TimeUnit.SECONDS);
event = listener.getEvents().poll(60, TimeUnit.SECONDS);
responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
Assert.assertTrue(responseData.has("values"));
@ -256,7 +256,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
createdDevice.getId());
Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
MqttEvent sharedAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS);
MqttEvent sharedAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS);
// Catch attribute update event
Assert.assertNotNull(sharedAttributeEvent);
@ -266,7 +266,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
checkAttribute(true, clientAttributeValue);
checkAttribute(false, sharedAttributeValue);
@ -276,7 +276,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
public void subscribeToAttributeUpdatesFromServer() throws Exception {
mqttClient.on("v1/gateway/attributes", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
String sharedAttributeName = "sharedAttr";
// Add a new shared attribute
@ -313,7 +313,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
createdDevice.getId());
Assert.assertTrue(updatedSharedAttributesResponse.getStatusCode().is2xxSuccessful());
event = listener.getEvents().poll(10, TimeUnit.SECONDS);
event = listener.getEvents().poll(60, TimeUnit.SECONDS);
Assert.assertEquals(updatedSharedAttributeValue,
mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get("data").get(sharedAttributeName).asText());
}
@ -324,7 +324,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
mqttClient.on(gatewayRpcTopic, listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(30);
// Send an RPC from the server
JsonObject serverRpcPayload = new JsonObject();
@ -343,7 +343,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
});
// Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10, TimeUnit.SECONDS);
MqttEvent requestFromServer = listener.getEvents().poll(60, TimeUnit.SECONDS);
service.shutdownNow();
Assert.assertNotNull(requestFromServer);
@ -396,7 +396,7 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
gatewayAttributesRequest.addProperty("key", attributeName);
log.info(gatewayAttributesRequest.toString());
mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(gatewayAttributesRequest.toString().getBytes())).get();
MqttEvent clientAttributeEvent = listener.getEvents().poll(10, TimeUnit.SECONDS);
MqttEvent clientAttributeEvent = listener.getEvents().poll(60, TimeUnit.SECONDS);
Assert.assertNotNull(clientAttributeEvent);
JsonObject responseMessage = new JsonParser().parse(Objects.requireNonNull(clientAttributeEvent).getMessage()).getAsJsonObject();
@ -407,10 +407,12 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
}
private Device createDeviceThroughGateway(MqttClient mqttClient, Device gatewayDevice) throws Exception {
TimeUnit.SECONDS.sleep(30);
String deviceName = "mqtt_device";
mqttClient.publish("v1/gateway/connect", Unpooled.wrappedBuffer(createGatewayConnectPayload(deviceName).toString().getBytes()), MqttQoS.AT_LEAST_ONCE).get();
TimeUnit.SECONDS.sleep(60);
TimeUnit.SECONDS.sleep(30);
List<EntityRelation> relations = restClient.findByFrom(gatewayDevice.getId(), RelationTypeGroup.COMMON);