diff --git a/application/src/test/java/org/thingsboard/server/transport/AbstractTransportIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/AbstractTransportIntegrationTest.java index cf4de21cc2..89a4b707bf 100644 --- a/application/src/test/java/org/thingsboard/server/transport/AbstractTransportIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/AbstractTransportIntegrationTest.java @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public abstract class AbstractTransportIntegrationTest extends AbstractControllerTest { - protected static final int DEFAULT_WAIT_TIMEOUT_SECONDS = 10; + protected static final int DEFAULT_WAIT_TIMEOUT_SECONDS = 30; protected static final String MQTT_URL = "tcp://localhost:1883"; protected static final String COAP_BASE_URL = "coap://localhost:5683/api/v1/"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index 6d19e1261d..7feafead95 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -128,14 +128,16 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await onUpdateCallback").isTrue(); validateUpdateAttributesJsonResponse(onUpdateCallback, SHARED_ATTRIBUTES_PAYLOAD); MqttTestCallback onDeleteCallback = new MqttTestCallback(); client.setCallback(onDeleteCallback); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); - onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await onDeleteCallback").isTrue(); validateUpdateAttributesJsonResponse(onDeleteCallback, SHARED_ATTRIBUTES_DELETED_RESPONSE); client.disconnect(); } @@ -148,13 +150,15 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await onUpdateCallback").isTrue(); validateUpdateAttributesProtoResponse(onUpdateCallback); MqttTestCallback onDeleteCallback = new MqttTestCallback(); client.setCallback(onDeleteCallback); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); - onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await onDeleteCallback").isTrue(); validateDeleteAttributesProtoResponse(onDeleteCallback); client.disconnect(); } @@ -165,7 +169,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateUpdateAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException { - assertNotNull(callback.getPayloadBytes()); + assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); List tsKvProtoList = getTsKvProtoList("shared"); attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); @@ -181,7 +185,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateDeleteAttributesProtoResponse(MqttTestCallback callback) throws InvalidProtocolBufferException { - assertNotNull(callback.getPayloadBytes()); + assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); @@ -212,7 +216,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt client.subscribeAndWait(GATEWAY_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(onUpdateCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await onUpdateCallback").isTrue(); validateJsonGatewayUpdateAttributesResponse(onUpdateCallback, deviceName, SHARED_ATTRIBUTES_PAYLOAD); @@ -220,7 +225,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt client.setCallback(onDeleteCallback); doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); - onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(onDeleteCallback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await onDeleteCallback").isTrue(); validateJsonGatewayUpdateAttributesResponse(onDeleteCallback, deviceName, SHARED_ATTRIBUTES_DELETED_RESPONSE); client.disconnect(); @@ -249,7 +255,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateJsonGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName, String expectResultData) { - assertNotNull(callback.getPayloadBytes()); + assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull(); assertEquals(JacksonUtil.toJsonNode(getGatewayAttributesResponseJson(deviceName, expectResultData)), JacksonUtil.fromBytes(callback.getPayloadBytes())); } @@ -263,8 +269,9 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateProtoGatewayUpdateAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertNotNull(callback.getPayloadBytes()); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); + assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); List tsKvProtoList = getTsKvProtoList("shared"); @@ -288,8 +295,9 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateProtoGatewayDeleteAttributesResponse(MqttTestCallback callback, String deviceName) throws InvalidProtocolBufferException, InterruptedException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertNotNull(callback.getPayloadBytes()); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); + assertThat(callback.getPayloadBytes()).as("callback payload non-null").isNotNull(); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); TransportProtos.AttributeUpdateNotificationMsg attributeUpdateNotificationMsg = attributeUpdateNotificationMsgBuilder.build(); @@ -551,13 +559,15 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateJsonResponse(MqttTestCallback callback, String expectedResponse) throws InterruptedException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes())); } protected void validateProtoResponse(MqttTestCallback callback, TransportProtos.GetAttributeResponseMsg expectedResponse) throws InterruptedException, InvalidProtocolBufferException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); TransportProtos.GetAttributeResponseMsg actualAttributesResponse = TransportProtos.GetAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); assertEquals(expectedResponse.getRequestId(), actualAttributesResponse.getRequestId()); @@ -580,14 +590,16 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateJsonResponseGateway(MqttTestCallback callback, String deviceName, String expectedValues) throws InterruptedException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); String expectedRequestPayload = "{\"id\":1,\"device\":\"" + deviceName + "\",\"values\":" + expectedValues + "}"; assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); } protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes()); @@ -603,7 +615,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt } protected void validateProtoSharedResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { - callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, false); TransportApiProtos.GatewayAttributeResponseMsg actualGatewayAttributeResponseMsg = TransportApiProtos.GatewayAttributeResponseMsg.parseFrom(callback.getPayloadBytes());