diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java index cf2c751264..f6dc7ed579 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesJsonIntegrationTest.java @@ -108,4 +108,28 @@ public abstract class AbstractMqttTimeseriesJsonIntegrationTest extends Abstract assertFalse(callback.isPubAckReceived()); } + @Test + public void testPushTelemetryGatewayWithMalformedPayloadAndSendPubAckOnErrorEnabled() throws Exception { + processBeforeTest("Test Post Telemetry device json payload", "Test Post Telemetry gateway json payload", TransportPayloadType.JSON, POST_DATA_TELEMETRY_TOPIC, null, true); + CountDownLatch latch = new CountDownLatch(1); + MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + TestMqttPublishCallback callback = new TestMqttPublishCallback(latch); + client.setCallback(callback); + publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC); + latch.await(3, TimeUnit.SECONDS); + assertTrue(callback.isPubAckReceived()); + } + + @Test + public void testPushTelemetryGatewayWithMalformedPayloadAndSendPubAckOnErrorDisabled() throws Exception { + processBeforeTest("Test Post Telemetry device json payload", "Test Post Telemetry gateway json payload", TransportPayloadType.JSON, POST_DATA_TELEMETRY_TOPIC, null, false); + CountDownLatch latch = new CountDownLatch(1); + MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + TestMqttPublishCallback callback = new TestMqttPublishCallback(latch); + client.setCallback(callback); + publishMqttMsg(client, MALFORMED_JSON_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC); + latch.await(3, TimeUnit.SECONDS); + assertFalse(callback.isPubAckReceived()); + } + } diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java index fb6a8e1f9b..f5eb2cfd7a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/telemetry/timeseries/AbstractMqttTimeseriesProtoIntegrationTest.java @@ -333,6 +333,30 @@ public abstract class AbstractMqttTimeseriesProtoIntegrationTest extends Abstrac assertFalse(callback.isPubAckReceived()); } + @Test + public void testPushTelemetryGatewayWithMalformedPayloadAndSendPubAckOnErrorEnabled() throws Exception { + processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, true); + CountDownLatch latch = new CountDownLatch(1); + MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + TestMqttPublishCallback callback = new TestMqttPublishCallback(latch); + client.setCallback(callback); + publishMqttMsg(client, MALFORMED_PROTO_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC); + latch.await(3, TimeUnit.SECONDS); + assertTrue(callback.isPubAckReceived()); + } + + @Test + public void testPushTelemetryGatewayWithMalformedPayloadAndSendPubAckOnErrorDisabled() throws Exception { + processBeforeTest("Test Post Telemetry device proto payload", "Test Post Telemetry gateway proto payload", TransportPayloadType.PROTOBUF, POST_DATA_TELEMETRY_TOPIC, null, false); + CountDownLatch latch = new CountDownLatch(1); + MqttAsyncClient client = getMqttAsyncClient(gatewayAccessToken); + TestMqttPublishCallback callback = new TestMqttPublishCallback(latch); + client.setCallback(callback); + publishMqttMsg(client, MALFORMED_PROTO_PAYLOAD.getBytes(), POST_DATA_TELEMETRY_TOPIC); + latch.await(3, TimeUnit.SECONDS); + assertFalse(callback.isPubAckReceived()); + } + private DynamicSchema getDynamicSchema(String deviceTelemetryProtoSchema) { DeviceProfileTransportConfiguration transportConfiguration = deviceProfile.getProfileData().getTransportConfiguration(); assertTrue(transportConfiguration instanceof MqttDeviceProfileTransportConfiguration);