updated timeseries and attributes tests & cleanup code

This commit is contained in:
ShvaykaD 2022-05-12 10:59:39 +03:00
parent 6d89960754
commit d7291d1171
13 changed files with 171 additions and 222 deletions

View File

@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
import java.util.concurrent.CountDownLatch;
@ -32,6 +33,7 @@ public class MqttTestCallback implements MqttCallback {
private int qoS;
private byte[] payloadBytes;
private String awaitSubTopic;
private boolean pubAckReceived;
public MqttTestCallback(String awaitSubTopic) {
this.subscribeLatch = new CountDownLatch(1);
@ -47,6 +49,7 @@ public class MqttTestCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
log.warn("connectionLost: ", throwable);
deliveryLatch.countDown();
}
@Override
@ -69,6 +72,7 @@ public class MqttTestCallback implements MqttCallback {
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.warn("delivery complete: {}", iMqttDeliveryToken.getResponse());
pubAckReceived = iMqttDeliveryToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK;
deliveryLatch.countDown();
}

View File

@ -16,14 +16,17 @@
package org.thingsboard.server.transport.mqtt.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
@Slf4j
@DaoSqlTest
public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends AbstractMqttAttributesIntegrationTest {
@ -36,7 +39,7 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A
.enableCompatibilityWithJsonPayloadFormat(true)
.build();
processBeforeTest(configProperties);
processProtoTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
processProtoTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_TOPIC);
}
@Test
@ -48,7 +51,7 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A
.useJsonPayloadFormatForDefaultDownlinkTopics(true)
.build();
super.processBeforeTest(configProperties);
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_TOPIC);
}
@Test
@ -60,7 +63,7 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A
.useJsonPayloadFormatForDefaultDownlinkTopics(true)
.build();
super.processBeforeTest(configProperties);
processProtoTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC);
processProtoTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_TOPIC);
}
@Test
@ -72,7 +75,7 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A
.useJsonPayloadFormatForDefaultDownlinkTopics(true)
.build();
super.processBeforeTest(configProperties);
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
}
@Test
@ -84,7 +87,7 @@ public class MqttAttributesUpdatesBackwardCompatibilityIntegrationTest extends A
.useJsonPayloadFormatForDefaultDownlinkTopics(true)
.build();
super.processBeforeTest(configProperties);
processProtoTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC);
processProtoTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC);
}
@Test

View File

@ -19,11 +19,14 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
@Slf4j
@DaoSqlTest
public class MqttAttributesUpdatesIntegrationTest extends AbstractMqttAttributesIntegrationTest {
@ -40,17 +43,17 @@ public class MqttAttributesUpdatesIntegrationTest extends AbstractMqttAttributes
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_TOPIC);
}
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_TOPIC);
}
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
}
@Test

View File

@ -19,11 +19,14 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
@Slf4j
@DaoSqlTest
public class MqttAttributesUpdatesJsonIntegrationTest extends AbstractMqttAttributesIntegrationTest {
@ -40,17 +43,17 @@ public class MqttAttributesUpdatesJsonIntegrationTest extends AbstractMqttAttrib
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_TOPIC);
}
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_TOPIC);
}
@Test
public void testJsonSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
}
@Test

View File

@ -19,11 +19,15 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
@Slf4j
@DaoSqlTest
public class MqttAttributesUpdatesProtoIntegrationTest extends AbstractMqttAttributesIntegrationTest {
@ -40,22 +44,22 @@ public class MqttAttributesUpdatesProtoIntegrationTest extends AbstractMqttAttri
@Test
public void testProtoSubscribeToAttributesUpdatesFromTheServer() throws Exception {
processProtoTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
processProtoTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_TOPIC);
}
@Test
public void testProtoSubscribeToAttributesUpdatesFromTheServerOnShortTopic() throws Exception {
processProtoTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC);
processProtoTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_TOPIC);
}
@Test
public void testProtoSubscribeToAttributesUpdatesFromTheServerOnShortJsonTopic() throws Exception {
processJsonTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
processJsonTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC);
}
@Test
public void testProtoSubscribeToAttributesUpdatesFromTheServerOnShortProtoTopic() throws Exception {
processProtoTestSubscribeToAttributesUpdates(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC);
processProtoTestSubscribeToAttributesUpdates(DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC);
}
@Test

View File

@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.ClaimRequest;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.dao.device.claim.ClaimResponse;
import org.thingsboard.server.dao.device.claim.ClaimResult;
@ -35,6 +34,7 @@ import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_CLAIM_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CLAIM_TOPIC;
@Slf4j
@ -112,7 +112,7 @@ public class MqttClaimDeviceTest extends AbstractMqttIntegrationTest {
}
protected void validateClaimResponse(boolean emptyPayload, MqttTestClient client, byte[] payloadBytes, byte[] failurePayloadBytes) throws Exception {
client.publishAndWait(MqttTopics.DEVICE_CLAIM_TOPIC, failurePayloadBytes);
client.publishAndWait(DEVICE_CLAIM_TOPIC, failurePayloadBytes);
loginUser(customerAdmin.getName(), CUSTOMER_USER_PASSWORD);
ClaimRequest claimRequest;
@ -130,7 +130,8 @@ public class MqttClaimDeviceTest extends AbstractMqttIntegrationTest {
assertEquals(claimResponse, ClaimResponse.FAILURE);
client.publishAndWait(MqttTopics.DEVICE_CLAIM_TOPIC, payloadBytes);
client.publishAndWait(DEVICE_CLAIM_TOPIC, payloadBytes);
client.disconnect();
ClaimResult claimResult = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResult.class, status().isOk()),
@ -170,6 +171,7 @@ public class MqttClaimDeviceTest extends AbstractMqttIntegrationTest {
assertEquals(claimResponse, ClaimResponse.FAILURE);
client.publishAndWait(GATEWAY_CLAIM_TOPIC, payloadBytes);
client.disconnect();
ClaimResult claimResult = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + deviceName + "/claim", claimRequest, ClaimResult.class, status().isOk()),

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.transport.mqtt.claim;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;

View File

@ -17,18 +17,12 @@ package org.thingsboard.server.transport.mqtt.credentials;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.RandomStringUtils;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.service.DaoSqlTest;
@ -44,6 +38,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC;
@DaoSqlTest
public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
@ -138,7 +133,7 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
private void testTelemetryIsDelivered(Device device, MqttTestClient client, boolean ok) throws Exception {
String randomKey = RandomStringUtils.randomAlphanumeric(10);
List<String> expectedKeys = Arrays.asList(randomKey);
client.publishAndWait(MqttTopics.DEVICE_TELEMETRY_TOPIC, JacksonUtil.toString(JacksonUtil.newObjectNode().put(randomKey, true)).getBytes());
client.publishAndWait(DEVICE_TELEMETRY_TOPIC, JacksonUtil.toString(JacksonUtil.newObjectNode().put(randomKey, true)).getBytes());
String deviceId = device.getId().getId().toString();
@ -168,23 +163,6 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
client.disconnect();
}
protected MqttAsyncClient getMqttAsyncClient(String clientId, String username, String password) throws MqttException {
if (StringUtils.isEmpty(clientId)) {
clientId = MqttAsyncClient.generateClientId();
}
MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
if (StringUtils.isNotEmpty(username)) {
options.setUserName(username);
}
if (StringUtils.isNotEmpty(password)) {
options.setPassword(password.toCharArray());
}
client.connect(options).waitForCompletion();
return client;
}
private Device createDevice(String deviceName, BasicMqttCredentials clientIdCredValue) throws Exception {
Device device = new Device();
device.setName(deviceName);

View File

@ -18,14 +18,13 @@ package org.thingsboard.server.transport.mqtt.telemetry.attributes;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;
@ -38,6 +37,10 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
@Slf4j
@DaoSqlTest
@ -58,19 +61,19 @@ public class MqttAttributesIntegrationTest extends AbstractMqttIntegrationTest {
@Test
public void testPushAttributes() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(DEVICE_ATTRIBUTES_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(DEVICE_ATTRIBUTES_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
public void testPushAttributesOnShortJsonTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes());
}
@Test
@ -87,9 +90,11 @@ public class MqttAttributesIntegrationTest extends AbstractMqttIntegrationTest {
}
protected void processAttributesTest(String topic, List<String> expectedKeys, byte[] payload, boolean presenceFieldsTest) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
publishMqttMsg(client, payload, topic);
client.publishAndWait(topic, payload);
client.disconnect();
DeviceId deviceId = savedDevice.getId();
@ -125,9 +130,11 @@ public class MqttAttributesIntegrationTest extends AbstractMqttIntegrationTest {
}
protected void processGatewayAttributesTest(List<String> expectedKeys, byte[] payload, String firstDeviceName, String secondDeviceName) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
publishMqttMsg(client, payload, MqttTopics.GATEWAY_ATTRIBUTES_TOPIC);
client.publishAndWait(GATEWAY_ATTRIBUTES_TOPIC, payload);
client.disconnect();
Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
20,
@ -141,6 +148,7 @@ public class MqttAttributesIntegrationTest extends AbstractMqttIntegrationTest {
assertNotNull(secondDevice);
// todo removed sleep
Thread.sleep(2000);
List<String> firstDeviceActualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/attributes/CLIENT_SCOPE", new TypeReference<>() {});

View File

@ -25,7 +25,6 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.dao.service.DaoSqlTest;
@ -38,6 +37,9 @@ import java.util.List;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;
@Slf4j
@DaoSqlTest
@ -122,7 +124,7 @@ public class MqttAttributesProtoIntegrationTest extends MqttAttributesIntegratio
.build();
processBeforeTest(configProperties);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
processAttributesTest(DEVICE_ATTRIBUTES_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test
@ -133,7 +135,7 @@ public class MqttAttributesProtoIntegrationTest extends MqttAttributesIntegratio
.attributesTopicFilter(POST_DATA_ATTRIBUTES_TOPIC)
.build();
processBeforeTest(configProperties);
processJsonPayloadAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes());
processJsonPayloadAttributesTest(DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes());
}
@Test
@ -145,7 +147,7 @@ public class MqttAttributesProtoIntegrationTest extends MqttAttributesIntegratio
.build();
processBeforeTest(configProperties);
DynamicMessage postAttributesMsg = getDefaultDynamicMessage();
processAttributesTest(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
processAttributesTest(DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postAttributesMsg.toByteArray(), false);
}
@Test

View File

@ -18,19 +18,13 @@ package org.thingsboard.server.transport.mqtt.telemetry.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;
@ -38,12 +32,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
@Slf4j
public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqttIntegrationTest {
@ -66,26 +65,26 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
@Test
public void testPushTelemetry() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
processJsonPayloadTelemetryTest(DEVICE_TELEMETRY_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryWithTs() throws Exception {
String payloadStr = "{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}";
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
processJsonPayloadTelemetryTest(DEVICE_TELEMETRY_TOPIC, expectedKeys, payloadStr.getBytes(), true);
}
@Test
public void testPushTelemetryOnShortTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
processJsonPayloadTelemetryTest(DEVICE_TELEMETRY_SHORT_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
public void testPushTelemetryOnShortJsonTopic() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
processJsonPayloadTelemetryTest(DEVICE_TELEMETRY_SHORT_JSON_TOPIC, expectedKeys, PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
@ -94,14 +93,15 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
String deviceName1 = "Device A";
String deviceName2 = "Device B";
String payload = getGatewayTelemetryJsonPayload(deviceName1, deviceName2, "10000", "20000");
processGatewayTelemetryTest(MqttTopics.GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
}
@Test
public void testGatewayConnect() throws Exception {
String payload = "{\"device\":\"Device A\"}";
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, payload.getBytes(), MqttTopics.GATEWAY_CONNECT_TOPIC);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publish(GATEWAY_CONNECT_TOPIC, payload.getBytes());
String deviceName = "Device A";
@ -110,6 +110,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
100);
assertNotNull(device);
client.disconnect();
}
protected void processJsonPayloadTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, boolean withTs) throws Exception {
@ -117,8 +118,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
}
protected void processTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, boolean withTs, boolean presenceFieldsTest) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(accessToken);
publishMqttMsg(client, payload, topic);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
client.publishAndWait(topic, payload);
client.disconnect();
String deviceId = savedDevice.getId().getId().toString();
@ -187,9 +190,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
}
protected void processGatewayTelemetryTest(String topic, List<String> expectedKeys, byte[] payload, String firstDeviceName, String secondDeviceName) throws Exception {
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, payload, topic);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publishAndWait(topic, payload);
client.disconnect();
Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
20,
@ -203,6 +207,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
assertNotNull(secondDevice);
// TODO remove sleep
Thread.sleep(2000);
List<String> firstDeviceActualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + firstDevice.getId() + "/keys/timeseries", new TypeReference<>() {});
@ -312,93 +317,20 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
// @Test - Unstable
public void testMqttQoSLevel() throws Exception {
String clientId = MqttAsyncClient.generateClientId();
MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(accessToken);
CountDownLatch latch = new CountDownLatch(1);
TestMqttCallback callback = new TestMqttCallback(client, latch);
MqttTestClient client = new MqttTestClient();
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
client.connect(options).waitForCompletion(5000);
client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value());
client.connectAndWait(accessToken);
client.subscribe(DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
String payload = "{\"key\":\"uniqueValue\"}";
// TODO 3.1: we need to acknowledge subscription only after it is processed by device actor and not when the message is pushed to queue.
// MqttClient -> SUB REQUEST -> Transport -> Kafka -> Device Actor (subscribed)
// MqttClient <- SUB_ACK <- Transport
Thread.sleep(5000);
doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk());
latch.await(10, TimeUnit.SECONDS);
assertEquals(payload, callback.getPayload());
callback.getSubscribeLatch().await(10, TimeUnit.SECONDS);
assertEquals(payload.getBytes(), callback.getPayloadBytes());
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
}
private static class TestMqttCallback implements MqttCallback {
private final MqttAsyncClient client;
private final CountDownLatch latch;
private volatile Integer qoS;
private volatile String payload;
String getPayload() {
return payload;
}
TestMqttCallback(MqttAsyncClient client, CountDownLatch latch) {
this.client = client;
this.latch = latch;
}
int getQoS() {
return qoS;
}
@Override
public void connectionLost(Throwable throwable) {
log.error("Client connection lost", throwable);
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
payload = new String(mqttMessage.getPayload());
qoS = mqttMessage.getQos();
latch.countDown();
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}
public static class TestMqttPublishCallback implements MqttCallback {
private final CountDownLatch latch;
private boolean pubAckReceived;
public boolean isPubAckReceived() {
return pubAckReceived;
}
public TestMqttPublishCallback(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void connectionLost(Throwable throwable) {
latch.countDown();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
pubAckReceived = iMqttDeliveryToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK;
latch.countDown();
}
}
}

View File

@ -16,15 +16,15 @@
package org.thingsboard.server.transport.mqtt.telemetry.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
@ -121,13 +121,14 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
.sendAckOnValidationException(true)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_JSON_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertTrue(callback.isPubAckReceived());
client.disconnect();
}
@Test
@ -138,12 +139,12 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_JSON_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertFalse(callback.isPubAckReceived());
}
@ -157,13 +158,14 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
.sendAckOnValidationException(true)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_JSON_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertTrue(callback.isPubAckReceived());
client.disconnect();
}
@Test
@ -175,12 +177,12 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_JSON_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertFalse(callback.isPubAckReceived());
}

View File

@ -20,28 +20,32 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadConfiguration;
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
@Slf4j
public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends AbstractMqttTimeseriesIntegrationTest {
@ -272,7 +276,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.build();
processBeforeTest(configProperties);
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
processTelemetryTest(DEVICE_TELEMETRY_SHORT_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
@ -283,7 +287,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
processJsonPayloadTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes(), false);
processJsonPayloadTelemetryTest(DEVICE_TELEMETRY_SHORT_JSON_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), PAYLOAD_VALUES_STR.getBytes(), false);
}
@Test
@ -295,7 +299,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.build();
processBeforeTest(configProperties);
DynamicMessage postTelemetryMsg = getDefaultDynamicMessage();
processTelemetryTest(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
processTelemetryTest(DEVICE_TELEMETRY_SHORT_PROTO_TOPIC, Arrays.asList("key1", "key2", "key3", "key4", "key5"), postTelemetryMsg.toByteArray(), false, false);
}
@Test
@ -315,7 +319,7 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
TransportApiProtos.TelemetryMsg deviceBTelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName2, expectedKeys, 10000, 20000);
gatewayTelemetryMsgProtoBuilder.addAllMsg(Arrays.asList(deviceATelemetryMsgProto, deviceBTelemetryMsgProto));
TransportApiProtos.GatewayTelemetryMsg gatewayTelemetryMsg = gatewayTelemetryMsgProtoBuilder.build();
processGatewayTelemetryTest(MqttTopics.GATEWAY_TELEMETRY_TOPIC, expectedKeys, gatewayTelemetryMsg.toByteArray(), deviceName1, deviceName2);
processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, gatewayTelemetryMsg.toByteArray(), deviceName1, deviceName2);
}
@Test
@ -329,14 +333,16 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
processBeforeTest(configProperties);
String deviceName = "Device A";
TransportApiProtos.ConnectMsg connectMsgProto = getConnectProto(deviceName);
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
publishMqttMsg(client, connectMsgProto.toByteArray(), MqttTopics.GATEWAY_CONNECT_TOPIC);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publish(GATEWAY_CONNECT_TOPIC, connectMsgProto.toByteArray());
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20,
100);
assertNotNull(device);
client.disconnect();
}
@ -349,13 +355,14 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.sendAckOnValidationException(true)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_PROTO_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_PROTO_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertTrue(callback.isPubAckReceived());
client.disconnect();
}
@Test
@ -366,12 +373,12 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_PROTO_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_PROTO_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertFalse(callback.isPubAckReceived());
}
@ -385,13 +392,14 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.sendAckOnValidationException(true)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_JSON_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertTrue(callback.isPubAckReceived());
client.disconnect();
}
@Test
@ -403,12 +411,12 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.enableCompatibilityWithJsonPayloadFormat(true)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(accessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_JSON_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertFalse(callback.isPubAckReceived());
}
@ -422,13 +430,14 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.sendAckOnValidationException(true)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_PROTO_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_PROTO_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertTrue(callback.isPubAckReceived());
client.disconnect();
}
@Test
@ -440,12 +449,12 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
CountDownLatch latch = new CountDownLatch(1);
MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken);
TestMqttPublishCallback callback = new TestMqttPublishCallback(latch);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
MqttTestCallback callback = new MqttTestCallback();
client.setCallback(callback);
publishMqttMsg(client, MALFORMED_PROTO_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC);
latch.await(3, TimeUnit.SECONDS);
client.publish(POST_DATA_TELEMETRY_TOPIC, MALFORMED_PROTO_PAYLOAD.getBytes());
callback.getDeliveryLatch().await(3, TimeUnit.SECONDS);
assertFalse(callback.isPubAckReceived());
}