diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index c09f259aaa..c9b739bad1 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -862,7 +862,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { protected void awaitForDeviceActorToReceiveSubscription(DeviceId deviceId, FeatureType featureType, int subscriptionCount) { DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId); Map subscriptions = (Map) ReflectionTestUtils.getField(processor, getMapName(featureType)); - Awaitility.await("Device actor received subscription command from the transport").atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> subscriptions.size() == subscriptionCount); + Awaitility.await("Device actor received subscription command from the transport").atMost(5, TimeUnit.SECONDS).until(() -> { + log.warn("device {}, subscriptions.size() == {}", deviceId, subscriptions.size()); + return subscriptions.size() == subscriptionCount; + }); } protected static String getMapName(FeatureType featureType) { diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 303e91ee80..904a6246ab 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -671,6 +671,7 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { client.setCallback(onUpdateCallback); client.subscribeAndWait("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(device.getId(), FeatureType.ATTRIBUTES, 1); edgeImitator.expectResponsesAmount(1); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index f8165b0237..fa6c4d16a3 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -354,6 +354,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); client.publishAndWait(attrPubTopic, CLIENT_ATTRIBUTES_PAYLOAD.getBytes()); client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); + //RequestAttributes does not make any subscriptions in device actor + String update = getWsClient().waitForUpdate(); assertThat(update).as("ws update received").isNotBlank(); MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1")); @@ -383,6 +385,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); client.publishAndWait(attrPubTopic, getAttributesProtoPayloadBytes()); client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE); + //RequestAttributes does not make any subscriptions in device actor + String update = getWsClient().waitForUpdate(); assertThat(update).as("ws update received").isNotBlank(); MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1")); @@ -442,6 +446,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertThat(update).as("ws update received").isNotBlank(); client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE); + //RequestAttributes does not make any subscriptions in device actor MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(clientAttributesCallback); @@ -495,6 +500,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt assertThat(update).as("ws update received").isNotBlank(); client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE); + awaitForDeviceActorToReceiveSubscription(device.getId(), FeatureType.ATTRIBUTES, 1); MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC); client.setCallback(clientAttributesCallback); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java index 924b4d50a6..205386a32e 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -223,7 +223,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM MqttTestCallback callback = new MqttTestCallback(GATEWAY_RPC_TOPIC); client.setCallback(callback); - client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE); subscribeAndCheckSubscription(client, GATEWAY_RPC_TOPIC, savedDevice.getId(), FeatureType.RPC); String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/attributes/AbstractAttributesMqttV5Test.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/attributes/AbstractAttributesMqttV5Test.java index e3dfc6fb55..347b397ef5 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/attributes/AbstractAttributesMqttV5Test.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/attributes/AbstractAttributesMqttV5Test.java @@ -21,6 +21,7 @@ import org.junit.Before; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback; @@ -104,6 +105,7 @@ public abstract class AbstractAttributesMqttV5Test extends AbstractMqttV5Test { MqttV5TestCallback onUpdateCallback = new MqttV5TestCallback(); client.setCallback(onUpdateCallback); client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1); doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk()); onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/connection/AbstractMqttV5ClientConnectionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/connection/AbstractMqttV5ClientConnectionTest.java index 81b015e245..93d349e0ec 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/connection/AbstractMqttV5ClientConnectionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/connection/AbstractMqttV5ClientConnectionTest.java @@ -25,6 +25,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; import org.junit.Assert; import org.thingsboard.server.common.data.device.profile.MqttTopics; +import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; @@ -101,6 +102,7 @@ public abstract class AbstractMqttV5ClientConnectionTest extends AbstractMqttInt MqttV5TestCallback onUpdateCallback = new MqttV5TestCallback(); client.setCallback(onUpdateCallback); client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1); String payload = "{\"sharedStr\":\"" + StringUtils.repeat("*", valueLen) + "\"}"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/subscribe/AbstractMqttV5ClientSubscriptionTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/subscribe/AbstractMqttV5ClientSubscriptionTest.java index f681ad5b55..aebb5a7ed2 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/subscribe/AbstractMqttV5ClientSubscriptionTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/subscribe/AbstractMqttV5ClientSubscriptionTest.java @@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttSubAck; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; import org.junit.Assert; import org.thingsboard.server.common.data.device.profile.MqttTopics; +import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; @@ -34,6 +35,7 @@ public abstract class AbstractMqttV5ClientSubscriptionTest extends AbstractMqttI client.connectAndWait(accessToken); IMqttToken subscriptionResult = client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1); MqttWireMessage response = subscriptionResult.getResponse(); @@ -52,6 +54,7 @@ public abstract class AbstractMqttV5ClientSubscriptionTest extends AbstractMqttI client.connectAndWait(accessToken); IMqttToken iMqttToken = client.subscribeAndWait("wrong/topic/+", MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 0); Assert.assertEquals(MESSAGE_TYPE_SUBACK,iMqttToken.getResponse().getType()); MqttSubAck subAck = (MqttSubAck) iMqttToken.getResponse(); Assert.assertEquals(1, subAck.getReturnCodes().length); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/unsubscribe/AbstractMqttV5ClientUnsubscribeTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/unsubscribe/AbstractMqttV5ClientUnsubscribeTest.java index 835e225da4..ca534b9999 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/unsubscribe/AbstractMqttV5ClientUnsubscribeTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/client/unsubscribe/AbstractMqttV5ClientUnsubscribeTest.java @@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttUnsubAck; import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage; import org.junit.Assert; import org.thingsboard.server.common.data.device.profile.MqttTopics; +import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; @@ -34,6 +35,7 @@ public abstract class AbstractMqttV5ClientUnsubscribeTest extends AbstractMqttIn client.connectAndWait(accessToken); client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1); IMqttToken unsubscribeResult = client.unsubscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC); MqttWireMessage response = unsubscribeResult.getResponse(); Assert.assertEquals(MESSAGE_TYPE_UNSUBACK, response.getType()); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/AbstractMqttV5RpcTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/AbstractMqttV5RpcTest.java index b6604439da..aecd1124e3 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/AbstractMqttV5RpcTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv5/rpc/AbstractMqttV5RpcTest.java @@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback; import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient; @@ -45,6 +46,7 @@ public abstract class AbstractMqttV5RpcTest extends AbstractMqttV5Test { MqttV5TestCallback callback = new MqttV5TestCallback(DEVICE_RPC_REQUESTS_SUB_TOPIC.replace("+", "0")); client.setCallback(callback); client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.RPC, 1); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String result = doPostAsync("/api/rpc/oneway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk()); @@ -59,6 +61,7 @@ public abstract class AbstractMqttV5RpcTest extends AbstractMqttV5Test { MqttV5TestClient client = new MqttV5TestClient(); client.connectAndWait(accessToken); client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE); + awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.RPC, 1); MqttV5TestRpcCallback callback = new MqttV5TestRpcCallback(client, DEVICE_RPC_REQUESTS_SUB_TOPIC.replace("+", "0")); client.setCallback(callback); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";