handle timeouts on transport tests to provide clear explanation what is the problem. timeout increased for run test stable on slow (busy) environments

This commit is contained in:
Sergey Matvienko 2023-02-17 17:50:50 +01:00
parent ebdcda773c
commit a6a4a67a3d
2 changed files with 32 additions and 19 deletions

View File

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
public abstract class AbstractTransportIntegrationTest extends AbstractControllerTest { public abstract class AbstractTransportIntegrationTest extends AbstractControllerTest {
protected static final int DEFAULT_WAIT_TIMEOUT_SECONDS = 10; protected static final int DEFAULT_WAIT_TIMEOUT_SECONDS = 30;
protected static final String MQTT_URL = "tcp://localhost:1883"; protected static final String MQTT_URL = "tcp://localhost:1883";
protected static final String COAP_BASE_URL = "coap://localhost:5683/api/v1/"; protected static final String COAP_BASE_URL = "coap://localhost:5683/api/v1/";

View File

@ -128,14 +128,16 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await onUpdateCallback").isTrue();
validateUpdateAttributesJsonResponse(onUpdateCallback, SHARED_ATTRIBUTES_PAYLOAD); validateUpdateAttributesJsonResponse(onUpdateCallback, SHARED_ATTRIBUTES_PAYLOAD);
MqttTestCallback onDeleteCallback = new MqttTestCallback(); MqttTestCallback onDeleteCallback = new MqttTestCallback();
client.setCallback(onDeleteCallback); client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await onDeleteCallback").isTrue();
validateUpdateAttributesJsonResponse(onDeleteCallback, SHARED_ATTRIBUTES_DELETED_RESPONSE); validateUpdateAttributesJsonResponse(onDeleteCallback, SHARED_ATTRIBUTES_DELETED_RESPONSE);
client.disconnect(); client.disconnect();
} }
@ -148,13 +150,15 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await onUpdateCallback").isTrue();
validateUpdateAttributesProtoResponse(onUpdateCallback); validateUpdateAttributesProtoResponse(onUpdateCallback);
MqttTestCallback onDeleteCallback = new MqttTestCallback(); MqttTestCallback onDeleteCallback = new MqttTestCallback();
client.setCallback(onDeleteCallback); client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await onDeleteCallback").isTrue();
validateDeleteAttributesProtoResponse(onDeleteCallback); validateDeleteAttributesProtoResponse(onDeleteCallback);
client.disconnect(); client.disconnect();
} }
@ -165,7 +169,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateUpdateAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException { protected void validateUpdateAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException {
assertNotNull(callback.getPayloadBytes()); assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull();
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared"); List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared");
attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList);
@ -181,7 +185,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateDeleteAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException { protected void validateDeleteAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException {
assertNotNull(callback.getPayloadBytes()); assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull();
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson");
@ -212,7 +216,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await onUpdateCallback").isTrue();
validateJsonGatewayUpdateAttributesResponse(onUpdateCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD); validateJsonGatewayUpdateAttributesResponse(onUpdateCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD);
@ -220,7 +225,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
client.setCallback(onDeleteCallback); client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await onDeleteCallback").isTrue();
validateJsonGatewayUpdateAttributesResponse(onDeleteCallback, deviceName, SHARED_ATTRIBUTES_DELETED_RESPONSE); validateJsonGatewayUpdateAttributesResponse(onDeleteCallback, deviceName, SHARED_ATTRIBUTES_DELETED_RESPONSE);
client.disconnect(); client.disconnect();
@ -249,7 +255,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateJsonGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName, String expectResultData) { protected void validateJsonGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName, String expectResultData) {
assertNotNull(callback.getPayloadBytes()); assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull();
assertEquals(JacksonUtil.toJsonNode(getGatewayAttributesResponseJson(deviceName, expectResultData)), JacksonUtil.fromBytes(callback.getPayloadBytes())); assertEquals(JacksonUtil.toJsonNode(getGatewayAttributesResponseJson(deviceName, expectResultData)), JacksonUtil.fromBytes(callback.getPayloadBytes()));
} }
@ -263,8 +269,9 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateProtoGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException { protected void validateProtoGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
assertNotNull(callback.getPayloadBytes()); .as("await callback").isTrue();
assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull();
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared"); List<TransportProtos.TsKvProto> tsKvProtoList = getTsKvProtoList("shared");
@ -288,8 +295,9 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateProtoGatewayDeleteAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException { protected void validateProtoGatewayDeleteAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
assertNotNull(callback.getPayloadBytes()); .as("await callback").isTrue();
assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull();
TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder();
attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson");
TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build(); TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build();
@ -551,13 +559,15 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException { protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes())); assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes()));
} }
protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException { protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId()); assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId());
@ -580,14 +590,16 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException { protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}"; String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}";
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes()));
} }
protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true);
TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());
@ -603,7 +615,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
} }
protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
.as("await callback").isTrue();
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false);
TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());