rafactoring due to comments
This commit is contained in:
parent
805deffaa2
commit
dd3ed2a496
@ -51,7 +51,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
private int maxInflightMessages;
|
private int maxInflightMessages;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getServiceConfigurationRpcForDeviceTest() throws Exception {
|
public void getSessionLimitsRpcForDeviceTest() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||||
DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
||||||
@ -75,29 +75,30 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
client.setCallback(callback);
|
client.setCallback(callback);
|
||||||
client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||||
|
|
||||||
client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes());
|
client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes());
|
||||||
|
|
||||||
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
|
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
|
||||||
.as("await callback").isTrue();
|
.as("await callback").isTrue();
|
||||||
|
|
||||||
var payload = callback.getPayloadBytes();
|
var payload = callback.getPayloadBytes();
|
||||||
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {
|
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {});
|
||||||
});
|
|
||||||
|
|
||||||
assertNotNull(response);
|
assertNotNull(response);
|
||||||
assertEquals(response.size(), 6);
|
assertEquals(4, response.size());
|
||||||
assertEquals(response.get("deviceMsgRateLimit"), profileConfiguration.getTransportDeviceMsgRateLimit());
|
|
||||||
assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit());
|
|
||||||
assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit());
|
|
||||||
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
||||||
assertEquals(response.get("maxInflightMessages"), maxInflightMessages);
|
assertEquals(response.get("maxInflightMessages"), maxInflightMessages);
|
||||||
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
||||||
|
Map<String, String> rateLimits = (Map<String, String>) response.get("rateLimits");
|
||||||
|
assertEquals(3, rateLimits.size());
|
||||||
|
assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportDeviceMsgRateLimit());
|
||||||
|
assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit());
|
||||||
|
assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit());
|
||||||
|
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getServiceConfigurationRpcForGatewayTest() throws Exception {
|
public void getSessionLimitsRpcForGatewayTest() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||||
DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
||||||
@ -125,25 +126,26 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
client.setCallback(callback);
|
client.setCallback(callback);
|
||||||
client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||||
|
|
||||||
client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes());
|
client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes());
|
||||||
|
|
||||||
assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
||||||
|
|
||||||
var payload = callback.getPayloadBytes();
|
var payload = callback.getPayloadBytes();
|
||||||
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {
|
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {});
|
||||||
});
|
|
||||||
|
|
||||||
assertNotNull(response);
|
assertNotNull(response);
|
||||||
assertEquals(response.size(), 9);
|
assertEquals(4, response.size());
|
||||||
assertEquals(response.get("gatewayMsgRateLimit"), profileConfiguration.getTransportGatewayMsgRateLimit());
|
|
||||||
assertEquals(response.get("gatewayTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit());
|
|
||||||
assertEquals(response.get("gatewayTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit());
|
|
||||||
assertEquals(response.get("gatewayDeviceMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit());
|
|
||||||
assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
|
||||||
assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
|
||||||
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
||||||
assertEquals(response.get("maxInflightMessages"), maxInflightMessages);
|
assertEquals(response.get("maxInflightMessages"), maxInflightMessages);
|
||||||
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
||||||
|
Map<String, String> rateLimits = (Map<String, String>) response.get("rateLimits");
|
||||||
|
assertEquals(6, rateLimits.size());
|
||||||
|
assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportGatewayMsgRateLimit());
|
||||||
|
assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit());
|
||||||
|
assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit());
|
||||||
|
assertEquals(rateLimits.get("deviceMessages"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit());
|
||||||
|
assertEquals(rateLimits.get("deviceTelemetryMessages"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
||||||
|
assertEquals(rateLimits.get("deviceTelemetryDataPoints"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
||||||
|
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -58,11 +58,9 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
|||||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
|
||||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
|
||||||
import org.thingsboard.server.common.msg.EncryptionUtil;
|
import org.thingsboard.server.common.msg.EncryptionUtil;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||||
@ -135,7 +133,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
|
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
|
||||||
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
|
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
|
||||||
|
|
||||||
private static final String SERVICE_CONFIGURATION = "getServiceConfiguration";
|
private static final String SESSION_LIMITS = "getSessionLimits";
|
||||||
|
|
||||||
private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
|
private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
|
||||||
|
|
||||||
@ -500,8 +498,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
|
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
|
||||||
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
|
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
|
||||||
toServerRpcSubTopicType = TopicType.V1;
|
toServerRpcSubTopicType = TopicType.V1;
|
||||||
if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) {
|
if (SESSION_LIMITS.equals(rpcRequestMsg.getMethodName())) {
|
||||||
onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg);
|
onGetSessionLimitsRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg);
|
||||||
} else {
|
} else {
|
||||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
|
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
|
||||||
}
|
}
|
||||||
@ -1340,32 +1338,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) {
|
private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) {
|
||||||
var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId());
|
var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId());
|
||||||
DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration();
|
DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration();
|
||||||
Map<String, Object> serviceConfiguration = new HashMap<>();
|
Map<String, Object> sessionLimits = new HashMap<>();
|
||||||
|
Map<String, String> rateLimits = new HashMap<>();
|
||||||
|
|
||||||
if (sessionInfo.getIsGateway()) {
|
if (sessionInfo.getIsGateway()) {
|
||||||
serviceConfiguration.put("gatewayMsgRateLimit", profile.getTransportGatewayMsgRateLimit());
|
rateLimits.put("messages", profile.getTransportGatewayMsgRateLimit());
|
||||||
serviceConfiguration.put("gatewayTelemetryMsgRateLimit", profile.getTransportGatewayTelemetryMsgRateLimit());
|
rateLimits.put("telemetryMessages", profile.getTransportGatewayTelemetryMsgRateLimit());
|
||||||
serviceConfiguration.put("gatewayTelemetryDataPointsRateLimit", profile.getTransportGatewayTelemetryDataPointsRateLimit());
|
rateLimits.put("telemetryDataPoints", profile.getTransportGatewayTelemetryDataPointsRateLimit());
|
||||||
serviceConfiguration.put("gatewayDeviceMsgRateLimit", profile.getTransportGatewayDeviceMsgRateLimit());
|
rateLimits.put("deviceMessages", profile.getTransportGatewayDeviceMsgRateLimit());
|
||||||
serviceConfiguration.put("gatewayDeviceTelemetryMsgRateLimit", profile.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
rateLimits.put("deviceTelemetryMessages", profile.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
||||||
serviceConfiguration.put("gatewayDeviceTelemetryDataPointsRateLimit", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
rateLimits.put("deviceTelemetryDataPoints", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
||||||
} else {
|
} else {
|
||||||
serviceConfiguration.put("deviceMsgRateLimit", profile.getTransportDeviceMsgRateLimit());
|
rateLimits.put("messages", profile.getTransportDeviceMsgRateLimit());
|
||||||
serviceConfiguration.put("deviceTelemetryMsgRateLimit", profile.getTransportDeviceTelemetryMsgRateLimit());
|
rateLimits.put("telemetryMessages", profile.getTransportDeviceTelemetryMsgRateLimit());
|
||||||
serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit());
|
rateLimits.put("telemetryDataPoints", profile.getTransportDeviceTelemetryDataPointsRateLimit());
|
||||||
}
|
}
|
||||||
serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize());
|
sessionLimits.put("rateLimits", rateLimits);
|
||||||
serviceConfiguration.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit());
|
sessionLimits.put("maxPayloadSize", context.getMaxPayloadSize());
|
||||||
serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType());
|
sessionLimits.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit());
|
||||||
|
sessionLimits.put("payloadType", deviceSessionCtx.getPayloadType());
|
||||||
|
|
||||||
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
|
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
|
||||||
|
|
||||||
TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder()
|
TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder()
|
||||||
.setRequestId(rpcRequestMsg.getRequestId())
|
.setRequestId(rpcRequestMsg.getRequestId())
|
||||||
.setPayload(JacksonUtil.toString(serviceConfiguration))
|
.setPayload(JacksonUtil.toString(sessionLimits))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
onToServerRpcResponse(responseMsg);
|
onToServerRpcResponse(responseMsg);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user