minor refactoring
This commit is contained in:
parent
80d69df5f4
commit
b5fff2e44e
@ -49,8 +49,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
@Value("${transport.mqtt.netty.max_payload_size}")
|
@Value("${transport.mqtt.netty.max_payload_size}")
|
||||||
private Integer maxPayloadSize;
|
private Integer maxPayloadSize;
|
||||||
|
|
||||||
@Value("${transport.mqtt.msg_queue_size_per_device_limit:100}")
|
@Value("${transport.mqtt.msg_queue_size_per_device_limit}")
|
||||||
private int maxQueueSize;
|
private int maxInflightMessages;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void beforeTest() throws Exception {
|
public void beforeTest() throws Exception {
|
||||||
@ -110,7 +110,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit());
|
assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit());
|
||||||
assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit());
|
assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit());
|
||||||
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
||||||
assertEquals(response.get("maxQueueSize"), maxQueueSize);
|
assertEquals(response.get("maxInflightMessages"), maxInflightMessages);
|
||||||
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
||||||
|
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
@ -160,7 +160,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
||||||
assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
||||||
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
||||||
assertEquals(response.get("maxQueueSize"), maxQueueSize);
|
assertEquals(response.get("maxInflightMessages"), maxInflightMessages);
|
||||||
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
||||||
|
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
|
|||||||
@ -1362,7 +1362,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit());
|
serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit());
|
||||||
}
|
}
|
||||||
serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize());
|
serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize());
|
||||||
serviceConfiguration.put("maxQueueSize", context.getMessageQueueSizePerDeviceLimit());
|
serviceConfiguration.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit());
|
||||||
serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType());
|
serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType());
|
||||||
|
|
||||||
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
|
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user