added tests

This commit is contained in:
dashevchenko 2025-09-29 16:16:15 +03:00
parent f9b52c3a43
commit 5e9ae421be
3 changed files with 74 additions and 1 deletions

View File

@ -25,10 +25,10 @@ import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
@ -43,6 +43,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.any;
@ -112,6 +113,42 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
}
@Test
public void testAckIsReceivedOnFailedPublishMessage() throws Exception {
String devicePayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]";
String payloadA = "{\"Device A\": " + devicePayload + "}";
String deviceBPayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]";
String payloadB = "{\"Device B\": " + deviceBPayload + "}";
testAckIsReceivedOnFailedPublishMessage("Device A", payloadA.getBytes(), "Device B", payloadB.getBytes());
}
protected void testAckIsReceivedOnFailedPublishMessage(String deviceName1, byte[] payload1, String deviceName2, byte[] payload2) throws Exception {
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setMaxDevices(3);
});
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload1);
// check device is created
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
assertNotNull(doGet("/api/tenant/devices?deviceName=" + deviceName1, Device.class));
});
client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload2);
client.disconnectAndWait();
// check device was not created due to limit
doGet("/api/tenant/devices?deviceName=" + deviceName2).andExpect(status().isNotFound());
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setMaxDevices(0);
});
}
@Test
public void testGatewayConnect() throws Exception {
String payload = "{\"device\":\"Device A\"}";

View File

@ -186,4 +186,15 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract
assertFalse(callback.isPubAckReceived());
}
@Override
public void testAckIsReceivedOnFailedPublishMessage() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Telemetry device json payload")
.gatewayName("Test Post Telemetry gateway json payload")
.transportPayloadType(TransportPayloadType.JSON)
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
super.testAckIsReceivedOnFailedPublishMessage();
}
}

View File

@ -459,6 +459,31 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac
assertFalse(callback.isPubAckReceived());
}
@Override
public void testAckIsReceivedOnFailedPublishMessage() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Telemetry device proto payload")
.gatewayName("Test Post Telemetry gateway proto payload")
.transportPayloadType(TransportPayloadType.PROTOBUF)
.telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC)
.build();
processBeforeTest(configProperties);
TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder();
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
String deviceName1 = "Device A";
String deviceName2 = "Device B";
TransportApiProtos.TelemetryMsg deviceATelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName1, expectedKeys, 10000, 20000);
gatewayTelemetryMsgProtoBuilder.addAllMsg(List.of(deviceATelemetryMsgProto));
TransportApiProtos.GatewayTelemetryMsg payload1 = gatewayTelemetryMsgProtoBuilder.build();
TransportApiProtos.TelemetryMsg deviceBTelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName2, expectedKeys, 10000, 20000);
TransportApiProtos.GatewayTelemetryMsg payload2 = TransportApiProtos.GatewayTelemetryMsg.newBuilder()
.addAllMsg(List.of(deviceBTelemetryMsgProto))
.build();
super.testAckIsReceivedOnFailedPublishMessage(deviceName1, payload1.toByteArray(), deviceName2, payload2.toByteArray());
}
private DynamicSchema getDynamicSchema() {
DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration();
assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);