diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java index 9710c82866..a2754e055d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/attributes/AbstractCoapAttributesIntegrationTest.java @@ -46,11 +46,11 @@ import org.thingsboard.server.transport.coap.CoapTestClient; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -235,25 +235,37 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap CoapTestCallback callbackCoap = new CoapTestCallback(1); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); - callbackCoap.getLatch().await(3, TimeUnit.SECONDS); - + String awaitAlias = "await Json Test Subscribe To AttributesUpdates (client.getObserveRelation)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + 0 == callbackCoap.getObserve().intValue()); if (emptyCurrentStateNotification) { - validateUpdateAttributesJsonResponse(callbackCoap, "{}", 0); + validateUpdateAttributesJsonResponse(callbackCoap, "{}"); } else { - validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION, 0); + validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD_ON_CURRENT_STATE_NOTIFICATION); } - CountDownLatch latch = new CountDownLatch(1); - int expectedObserveCnt = callbackCoap.getObserve().intValue() + 1; + int expectedObserveBeforeAddCnt = callbackCoap.getObserve().intValue() + 1; doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - latch.await(3, TimeUnit.SECONDS); - validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD, expectedObserveCnt); + awaitAlias = "await Json Test Subscribe To AttributesUpdates (add attributes)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeAddCnt == callbackCoap.getObserve().intValue()); + validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_PAYLOAD); - latch = new CountDownLatch(1); int expectedObserveBeforeDeleteCnt = callbackCoap.getObserve().intValue() + 1; doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); - latch.await(3, TimeUnit.SECONDS); - validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_DELETED_RESPONSE, expectedObserveBeforeDeleteCnt); + awaitAlias = "await Json Test Subscribe To AttributesUpdates (deleted attributes)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeDeleteCnt == callbackCoap.getObserve().intValue()); + validateUpdateAttributesJsonResponse(callbackCoap, SHARED_ATTRIBUTES_DELETED_RESPONSE); observeRelation.proactiveCancel(); assertTrue(observeRelation.isCanceled()); @@ -269,8 +281,13 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap client = new CoapTestClient(accessToken, FeatureType.ATTRIBUTES); CoapTestCallback callbackCoap = new CoapTestCallback(1); + String awaitAlias = "await Proto Test Subscribe To Attributes Updates (add attributes)"; CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); - callbackCoap.getLatch().await(3, TimeUnit.SECONDS); + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + 0 == callbackCoap.getObserve().intValue()); if (emptyCurrentStateNotification) { validateEmptyCurrentStateAttributesProtoResponse(callbackCoap); @@ -278,16 +295,24 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap validateCurrentStateAttributesProtoResponse(callbackCoap); } - CountDownLatch latch = new CountDownLatch(1); - int expectedObserveCnt = callbackCoap.getObserve().intValue() + 1; + int expectedObserveBeforeAddCnt = callbackCoap.getObserve().intValue() + 1; doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); - latch.await(3, TimeUnit.SECONDS); - validateUpdateProtoAttributesResponse(callbackCoap, expectedObserveCnt); + awaitAlias = "await Proto Test Subscribe To Attributes Updates (add attributes)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeAddCnt == callbackCoap.getObserve().intValue()); + validateUpdateProtoAttributesResponse(callbackCoap, expectedObserveBeforeAddCnt); - latch = new CountDownLatch(1); int expectedObserveBeforeDeleteCnt = callbackCoap.getObserve().intValue() + 1; doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class); - latch.await(3, TimeUnit.SECONDS); + awaitAlias = "await Proto Test Subscribe To Attributes Updates (deleted attributes)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeDeleteCnt == callbackCoap.getObserve().intValue()); validateDeleteProtoAttributesResponse(callbackCoap, expectedObserveBeforeDeleteCnt); observeRelation.proactiveCancel(); @@ -314,27 +339,18 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap assertTrue(actualSharedKeyValueProtos.containsAll(expectedSharedKeyValueProtos)); } - protected void validateUpdateAttributesJsonResponse(CoapTestCallback callback, String expectedResponse, int expectedObserveCnt) { + protected void validateUpdateAttributesJsonResponse(CoapTestCallback callback, String expectedResponse) { assertNotNull(callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(expectedObserveCnt, callback.getObserve().intValue()); String response = new String(callback.getPayloadBytes(), StandardCharsets.UTF_8); assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.toJsonNode(response)); } protected void validateEmptyCurrentStateAttributesProtoResponse(CoapTestCallback callback) throws InvalidProtocolBufferException { assertArrayEquals(EMPTY_PAYLOAD, callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(0, callback.getObserve().intValue()); } protected void validateCurrentStateAttributesProtoResponse(CoapTestCallback callback) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(0, callback.getObserve().intValue()); TransportProtos.AttributeUpdateNotificationMsg.Builder expectedCurrentStateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); TransportProtos.TsKvProto tsKvProtoAttribute1 = getTsKvProto("sharedStr", "value", TransportProtos.KeyValueType.STRING_V); TransportProtos.TsKvProto tsKvProtoAttribute2 = getTsKvProto("sharedBool", "false", TransportProtos.KeyValueType.BOOLEAN_V); @@ -359,9 +375,6 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap protected void validateUpdateProtoAttributesResponse(CoapTestCallback callback, int expectedObserveCnt) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(expectedObserveCnt, callback.getObserve().intValue()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); List tsKvProtoList = getTsKvProtoList("shared"); attributeUpdateNotificationMsgBuilder.addAllSharedUpdated(tsKvProtoList); @@ -378,9 +391,6 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap protected void validateDeleteProtoAttributesResponse(CoapTestCallback callback, int expectedObserveCnt) throws InvalidProtocolBufferException { assertNotNull(callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(expectedObserveCnt, callback.getObserve().intValue()); TransportProtos.AttributeUpdateNotificationMsg.Builder attributeUpdateNotificationMsgBuilder = TransportProtos.AttributeUpdateNotificationMsg.newBuilder(); attributeUpdateNotificationMsgBuilder.addSharedDeleted("sharedJson"); @@ -395,9 +405,10 @@ public abstract class AbstractCoapAttributesIntegrationTest extends AbstractCoap Awaitility.await("awaitClientAfterCancelObserve") .pollInterval(10, TimeUnit.MILLISECONDS) .atMost(5, TimeUnit.SECONDS) - .until(()->{ + .until(() -> { log.trace("awaiting defaultTransportService.sessions is empty"); - return defaultTransportService.sessions.isEmpty();}); + return defaultTransportService.sessions.isEmpty(); + }); } private TransportProtos.GetAttributeResponseMsg getExpectedAttributeResponseMsg() { diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java index beb4d3747c..e17f5b04c2 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/AbstractCoapServerSideRpcIntegrationTest.java @@ -43,6 +43,7 @@ import org.thingsboard.server.transport.coap.CoapTestClient; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -75,14 +76,23 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, 1, true, protobuf); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); - callbackCoap.getLatch().await(3, TimeUnit.SECONDS); + String awaitAlias = "await One Way Rpc (client.getObserveRelation)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + 0 == callbackCoap.getObserve().intValue()); validateCurrentStateNotification(callbackCoap); - - CountDownLatch latch = new CountDownLatch(1); + int expectedObserveBeforeGpioRequestCnt = callbackCoap.getObserve().intValue() + 1; String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); String result = doPostAsync("/api/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); - latch.await(3, TimeUnit.SECONDS); + awaitAlias = "await One Way Rpc setGpio(method, params, value)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeGpioRequestCnt == callbackCoap.getObserve().intValue()); validateOneWayStateChangedNotification(callbackCoap, result); observeRelation.proactiveCancel(); @@ -94,23 +104,38 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC CoapTestCallback callbackCoap = new TestCoapCallbackForRPC(client, 1, false, protobuf); CoapObserveRelation observeRelation = client.getObserveRelation(callbackCoap); - callbackCoap.getLatch().await(3, TimeUnit.SECONDS); - + String awaitAlias = "await Two Way Rpc (client.getObserveRelation)"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.VALID.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + 0 == callbackCoap.getObserve().intValue()); validateCurrentStateNotification(callbackCoap); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); - + int expectedObserveBeforeGpioRequestAddCnt1 = callbackCoap.getObserve().intValue() + 1; String actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - callbackCoap.getLatch().await(3, TimeUnit.SECONDS); + awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeGpioRequestAddCnt1 == callbackCoap.getObserve().intValue()); + validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult); - validateTwoWayStateChangedNotification(callbackCoap, 1, expectedResponseResult, actualResult); + validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult); - CountDownLatch latch = new CountDownLatch(1); + int expectedObserveBeforeGpioRequestAddCnt2 = callbackCoap.getObserve().intValue() + 1; actualResult = doPostAsync("/api/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isOk()); - callbackCoap.getLatch().await(3, TimeUnit.SECONDS); + awaitAlias = "await Two Way Rpc (setGpio(method, params, value) first"; + await(awaitAlias) + .atMost(10, TimeUnit.SECONDS) + .until(() -> CoAP.ResponseCode.CONTENT.equals(callbackCoap.getResponseCode()) && + callbackCoap.getObserve() != null && + expectedObserveBeforeGpioRequestAddCnt2 == callbackCoap.getObserve().intValue()); - validateTwoWayStateChangedNotification(callbackCoap, 2, expectedResponseResult, actualResult); + validateTwoWayStateChangedNotification(callbackCoap, expectedResponseResult, actualResult); observeRelation.proactiveCancel(); assertTrue(observeRelation.isCanceled()); @@ -184,25 +209,16 @@ public abstract class AbstractCoapServerSideRpcIntegrationTest extends AbstractC private void validateCurrentStateNotification(CoapTestCallback callback) { assertArrayEquals(EMPTY_PAYLOAD, callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(callback.getResponseCode(), CoAP.ResponseCode.VALID); - assertEquals(0, callback.getObserve().intValue()); } private void validateOneWayStateChangedNotification(CoapTestCallback callback, String result) { assertTrue(StringUtils.isEmpty(result)); assertNotNull(callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(1, callback.getObserve().intValue()); } - private void validateTwoWayStateChangedNotification(CoapTestCallback callback, int expectedObserveNumber, String expectedResult, String actualResult) { + private void validateTwoWayStateChangedNotification(CoapTestCallback callback, String expectedResult, String actualResult) { assertEquals(expectedResult, actualResult); assertNotNull(callback.getPayloadBytes()); - assertNotNull(callback.getObserve()); - assertEquals(CoAP.ResponseCode.CONTENT, callback.getResponseCode()); - assertEquals(expectedObserveNumber, callback.getObserve().intValue()); } protected class TestCoapCallbackForRPC extends CoapTestCallback { diff --git a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/CoapServerSideRpcJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/CoapServerSideRpcJsonIntegrationTest.java index 96ab02325a..29fe4faf70 100644 --- a/application/src/test/java/org/thingsboard/server/transport/coap/rpc/CoapServerSideRpcJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/coap/rpc/CoapServerSideRpcJsonIntegrationTest.java @@ -52,5 +52,4 @@ public class CoapServerSideRpcJsonIntegrationTest extends AbstractCoapServerSide public void testServerCoapTwoWayRpc() throws Exception { processTwoWayRpcTest("{\"value1\":\"A\",\"value2\":\"B\"}", false); } - }