Merge pull request #8220 from smatvienko-tb/fix/tests-awaitForDeviceActorToReceiveSubscription
[3.5] Fix tests - await for device actor to receive subscription
This commit is contained in:
commit
444c05a76c
@ -862,7 +862,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
||||
protected void awaitForDeviceActorToReceiveSubscription(DeviceId deviceId, FeatureType featureType, int subscriptionCount) {
|
||||
DeviceActorMessageProcessor processor = getDeviceActorProcessor(deviceId);
|
||||
Map<UUID, SessionInfo> subscriptions = (Map<UUID, SessionInfo>) ReflectionTestUtils.getField(processor, getMapName(featureType));
|
||||
Awaitility.await("Device actor received subscription command from the transport").atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> subscriptions.size() == subscriptionCount);
|
||||
Awaitility.await("Device actor received subscription command from the transport").atMost(5, TimeUnit.SECONDS).until(() -> {
|
||||
log.warn("device {}, subscriptions.size() == {}", deviceId, subscriptions.size());
|
||||
return subscriptions.size() == subscriptionCount;
|
||||
});
|
||||
}
|
||||
|
||||
protected static String getMapName(FeatureType featureType) {
|
||||
|
||||
@ -671,6 +671,7 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
|
||||
client.setCallback(onUpdateCallback);
|
||||
|
||||
client.subscribeAndWait("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(device.getId(), FeatureType.ATTRIBUTES, 1);
|
||||
|
||||
edgeImitator.expectResponsesAmount(1);
|
||||
|
||||
|
||||
@ -354,6 +354,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
|
||||
SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
client.publishAndWait(attrPubTopic, CLIENT_ATTRIBUTES_PAYLOAD.getBytes());
|
||||
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
|
||||
//RequestAttributes does not make any subscriptions in device actor
|
||||
|
||||
String update = getWsClient().waitForUpdate();
|
||||
assertThat(update).as("ws update received").isNotBlank();
|
||||
MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1"));
|
||||
@ -383,6 +385,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
client.publishAndWait(attrPubTopic, getAttributesProtoPayloadBytes());
|
||||
client.subscribeAndWait(attrSubTopic, MqttQoS.AT_MOST_ONCE);
|
||||
//RequestAttributes does not make any subscriptions in device actor
|
||||
|
||||
String update = getWsClient().waitForUpdate();
|
||||
assertThat(update).as("ws update received").isNotBlank();
|
||||
MqttTestCallback callback = new MqttTestCallback(attrSubTopic.replace("+", "1"));
|
||||
@ -442,6 +446,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
|
||||
assertThat(update).as("ws update received").isNotBlank();
|
||||
|
||||
client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE);
|
||||
//RequestAttributes does not make any subscriptions in device actor
|
||||
|
||||
MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
|
||||
client.setCallback(clientAttributesCallback);
|
||||
@ -495,6 +500,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
|
||||
assertThat(update).as("ws update received").isNotBlank();
|
||||
|
||||
client.subscribeAndWait(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, MqttQoS.AT_LEAST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(device.getId(), FeatureType.ATTRIBUTES, 1);
|
||||
|
||||
MqttTestCallback clientAttributesCallback = new MqttTestCallback(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC);
|
||||
client.setCallback(clientAttributesCallback);
|
||||
|
||||
@ -223,7 +223,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractM
|
||||
|
||||
MqttTestCallback callback = new MqttTestCallback(GATEWAY_RPC_TOPIC);
|
||||
client.setCallback(callback);
|
||||
client.subscribeAndWait(GATEWAY_RPC_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
subscribeAndCheckSubscription(client, GATEWAY_RPC_TOPIC, savedDevice.getId(), FeatureType.RPC);
|
||||
|
||||
String setGpioRequest = "{\"method\": \"toggle_gpio\", \"params\": {\"pin\":1}}";
|
||||
|
||||
@ -21,6 +21,7 @@ import org.junit.Before;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttTopics;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
|
||||
@ -104,6 +105,7 @@ public abstract class AbstractAttributesMqttV5Test extends AbstractMqttV5Test {
|
||||
MqttV5TestCallback onUpdateCallback = new MqttV5TestCallback();
|
||||
client.setCallback(onUpdateCallback);
|
||||
client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1);
|
||||
|
||||
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
|
||||
onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
|
||||
|
||||
@ -25,6 +25,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
|
||||
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
|
||||
import org.junit.Assert;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttTopics;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
|
||||
@ -101,6 +102,7 @@ public abstract class AbstractMqttV5ClientConnectionTest extends AbstractMqttInt
|
||||
MqttV5TestCallback onUpdateCallback = new MqttV5TestCallback();
|
||||
client.setCallback(onUpdateCallback);
|
||||
client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1);
|
||||
|
||||
String payload = "{\"sharedStr\":\"" + StringUtils.repeat("*", valueLen) + "\"}";
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttSubAck;
|
||||
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
|
||||
import org.junit.Assert;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttTopics;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
|
||||
|
||||
@ -34,6 +35,7 @@ public abstract class AbstractMqttV5ClientSubscriptionTest extends AbstractMqttI
|
||||
client.connectAndWait(accessToken);
|
||||
|
||||
IMqttToken subscriptionResult = client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1);
|
||||
|
||||
MqttWireMessage response = subscriptionResult.getResponse();
|
||||
|
||||
@ -52,6 +54,7 @@ public abstract class AbstractMqttV5ClientSubscriptionTest extends AbstractMqttI
|
||||
client.connectAndWait(accessToken);
|
||||
|
||||
IMqttToken iMqttToken = client.subscribeAndWait("wrong/topic/+", MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 0);
|
||||
Assert.assertEquals(MESSAGE_TYPE_SUBACK,iMqttToken.getResponse().getType());
|
||||
MqttSubAck subAck = (MqttSubAck) iMqttToken.getResponse();
|
||||
Assert.assertEquals(1, subAck.getReturnCodes().length);
|
||||
|
||||
@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.common.packet.MqttUnsubAck;
|
||||
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
|
||||
import org.junit.Assert;
|
||||
import org.thingsboard.server.common.data.device.profile.MqttTopics;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
|
||||
|
||||
@ -34,6 +35,7 @@ public abstract class AbstractMqttV5ClientUnsubscribeTest extends AbstractMqttIn
|
||||
client.connectAndWait(accessToken);
|
||||
|
||||
client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.ATTRIBUTES, 1);
|
||||
IMqttToken unsubscribeResult = client.unsubscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
|
||||
MqttWireMessage response = unsubscribeResult.getResponse();
|
||||
Assert.assertEquals(MESSAGE_TYPE_UNSUBACK, response.getType());
|
||||
|
||||
@ -22,6 +22,7 @@ import org.eclipse.paho.mqttv5.common.MqttException;
|
||||
import org.eclipse.paho.mqttv5.common.MqttMessage;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
|
||||
@ -45,6 +46,7 @@ public abstract class AbstractMqttV5RpcTest extends AbstractMqttV5Test {
|
||||
MqttV5TestCallback callback = new MqttV5TestCallback(DEVICE_RPC_REQUESTS_SUB_TOPIC.replace("+", "0"));
|
||||
client.setCallback(callback);
|
||||
client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.RPC, 1);
|
||||
|
||||
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
|
||||
String result = doPostAsync("/api/rpc/oneway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk());
|
||||
@ -59,6 +61,7 @@ public abstract class AbstractMqttV5RpcTest extends AbstractMqttV5Test {
|
||||
MqttV5TestClient client = new MqttV5TestClient();
|
||||
client.connectAndWait(accessToken);
|
||||
client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE);
|
||||
awaitForDeviceActorToReceiveSubscription(savedDevice.getId(), FeatureType.RPC, 1);
|
||||
MqttV5TestRpcCallback callback = new MqttV5TestRpcCallback(client, DEVICE_RPC_REQUESTS_SUB_TOPIC.replace("+", "0"));
|
||||
client.setCallback(callback);
|
||||
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user