diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java index 6543955ed5..9856aa18cb 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesIntegrationTest.java @@ -25,10 +25,10 @@ import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; -import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; @@ -43,6 +43,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.any; @@ -112,6 +113,42 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2); } + @Test + public void testAckIsReceivedOnFailedPublishMessage() throws Exception { + String devicePayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]"; + String payloadA = "{\"Device A\": " + devicePayload + "}"; + + String deviceBPayload = "[{\"ts\": 10000, \"values\": " + PAYLOAD_VALUES_STR + "}]"; + String payloadB = "{\"Device B\": " + deviceBPayload + "}"; + + testAckIsReceivedOnFailedPublishMessage("Device A", payloadA.getBytes(), "Device B", payloadB.getBytes()); + } + + protected void testAckIsReceivedOnFailedPublishMessage(String deviceName1, byte[] payload1, String deviceName2, byte[] payload2) throws Exception { + updateDefaultTenantProfileConfig(profileConfiguration -> { + profileConfiguration.setMaxDevices(3); + }); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload1); + + // check device is created + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertNotNull(doGet("/api/tenant/devices?deviceName=" + deviceName1, Device.class)); + }); + + client.publishAndWait(GATEWAY_TELEMETRY_TOPIC, payload2); + client.disconnectAndWait(); + + // check device was not created due to limit + doGet("/api/tenant/devices?deviceName=" + deviceName2).andExpect(status().isNotFound()); + + updateDefaultTenantProfileConfig(profileConfiguration -> { + profileConfiguration.setMaxDevices(0); + }); + } + @Test public void testGatewayConnect() throws Exception { String payload = "{\"device\":\"Device A\"}"; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java index a23506a160..92b35d896c 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java @@ -186,4 +186,15 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract assertFalse(callback.isPubAckReceived()); } + @Override + public void testAckIsReceivedOnFailedPublishMessage() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Post Telemetry device json payload") + .gatewayName("Test Post Telemetry gateway json payload") + .transportPayloadType(TransportPayloadType.JSON) + .telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC) + .build(); + processBeforeTest(configProperties); + super.testAckIsReceivedOnFailedPublishMessage(); + } } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java index 1b6c247d96..60e5857c04 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java @@ -459,6 +459,31 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac assertFalse(callback.isPubAckReceived()); } + @Override + public void testAckIsReceivedOnFailedPublishMessage() throws Exception { + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Post Telemetry device proto payload") + .gatewayName("Test Post Telemetry gateway proto payload") + .transportPayloadType(TransportPayloadType.PROTOBUF) + .telemetryTopicFilter(POST_DATA_TELEMETRY_TOPIC) + .build(); + processBeforeTest(configProperties); + + TransportApiProtos.GatewayTelemetryMsg.Builder gatewayTelemetryMsgProtoBuilder = TransportApiProtos.GatewayTelemetryMsg.newBuilder(); + List expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); + String deviceName1 = "Device A"; + String deviceName2 = "Device B"; + TransportApiProtos.TelemetryMsg deviceATelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName1, expectedKeys, 10000, 20000); + gatewayTelemetryMsgProtoBuilder.addAllMsg(List.of(deviceATelemetryMsgProto)); + TransportApiProtos.GatewayTelemetryMsg payload1 = gatewayTelemetryMsgProtoBuilder.build(); + + TransportApiProtos.TelemetryMsg deviceBTelemetryMsgProto = getDeviceTelemetryMsgProto(deviceName2, expectedKeys, 10000, 20000); + TransportApiProtos.GatewayTelemetryMsg payload2 = TransportApiProtos.GatewayTelemetryMsg.newBuilder() + .addAllMsg(List.of(deviceBTelemetryMsgProto)) + .build(); + super.testAckIsReceivedOnFailedPublishMessage(deviceName1, payload1.toByteArray(), deviceName2, payload2.toByteArray()); + } + private DynamicSchema getDynamicSchema() { DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);