refactored due to comments
This commit is contained in:
parent
b5fff2e44e
commit
805deffaa2
@ -17,8 +17,6 @@ package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
@ -52,30 +50,11 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
@Value("${transport.mqtt.msg_queue_size_per_device_limit}")
|
@Value("${transport.mqtt.msg_queue_size_per_device_limit}")
|
||||||
private int maxInflightMessages;
|
private int maxInflightMessages;
|
||||||
|
|
||||||
@Before
|
|
||||||
public void beforeTest() throws Exception {
|
|
||||||
loginSysAdmin();
|
|
||||||
|
|
||||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
|
||||||
Assert.assertNotNull(tenantProfile);
|
|
||||||
|
|
||||||
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
|
||||||
|
|
||||||
profileConfiguration.setTransportGatewayMsgRateLimit(null);
|
|
||||||
profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null);
|
|
||||||
profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null);
|
|
||||||
|
|
||||||
profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null);
|
|
||||||
profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null);
|
|
||||||
profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null);
|
|
||||||
|
|
||||||
doPost("/api/tenantProfile", tenantProfile);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getServiceConfigurationRpcForDeviceTest() throws Exception {
|
public void getServiceConfigurationRpcForDeviceTest() throws Exception {
|
||||||
|
loginSysAdmin();
|
||||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||||
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
||||||
|
|
||||||
profileConfiguration.setTransportDeviceMsgRateLimit("20:600");
|
profileConfiguration.setTransportDeviceMsgRateLimit("20:600");
|
||||||
profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600");
|
profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600");
|
||||||
@ -102,7 +81,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
.as("await callback").isTrue();
|
.as("await callback").isTrue();
|
||||||
|
|
||||||
var payload = callback.getPayloadBytes();
|
var payload = callback.getPayloadBytes();
|
||||||
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {});
|
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {
|
||||||
|
});
|
||||||
|
|
||||||
assertNotNull(response);
|
assertNotNull(response);
|
||||||
assertEquals(response.size(), 6);
|
assertEquals(response.size(), 6);
|
||||||
@ -118,8 +98,9 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void getServiceConfigurationRpcForGatewayTest() throws Exception {
|
public void getServiceConfigurationRpcForGatewayTest() throws Exception {
|
||||||
|
loginSysAdmin();
|
||||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||||
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration();
|
||||||
|
|
||||||
profileConfiguration.setTransportGatewayMsgRateLimit("100:600");
|
profileConfiguration.setTransportGatewayMsgRateLimit("100:600");
|
||||||
profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600");
|
profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600");
|
||||||
@ -149,7 +130,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes
|
|||||||
assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
||||||
|
|
||||||
var payload = callback.getPayloadBytes();
|
var payload = callback.getPayloadBytes();
|
||||||
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {});
|
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {
|
||||||
|
});
|
||||||
|
|
||||||
assertNotNull(response);
|
assertNotNull(response);
|
||||||
assertEquals(response.size(), 9);
|
assertEquals(response.size(), 9);
|
||||||
|
|||||||
@ -499,12 +499,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
|
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
|
||||||
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
|
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
|
||||||
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
|
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
|
||||||
if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) {
|
|
||||||
toServerRpcSubTopicType = TopicType.V1;
|
toServerRpcSubTopicType = TopicType.V1;
|
||||||
|
if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) {
|
||||||
onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg);
|
onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg);
|
||||||
} else {
|
} else {
|
||||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
|
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
|
||||||
toServerRpcSubTopicType = TopicType.V1;
|
|
||||||
}
|
}
|
||||||
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
|
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
|
||||||
TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
|
TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
|
||||||
@ -1342,11 +1341,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) {
|
private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) {
|
||||||
TenantId tenantId = TenantId.fromUUID(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
|
var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId());
|
||||||
|
DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration();
|
||||||
var tenantProfile = context.getTenantProfileCache().get(tenantId);
|
|
||||||
TenantProfileData profileData = tenantProfile.getProfileData();
|
|
||||||
DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
|
|
||||||
Map<String, Object> serviceConfiguration = new HashMap<>();
|
Map<String, Object> serviceConfiguration = new HashMap<>();
|
||||||
|
|
||||||
if (sessionInfo.getIsGateway()) {
|
if (sessionInfo.getIsGateway()) {
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
|
|||||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
|
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
|
||||||
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
|
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
|
||||||
@ -40,6 +41,8 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
|
|||||||
@Getter
|
@Getter
|
||||||
private volatile DeviceId deviceId;
|
private volatile DeviceId deviceId;
|
||||||
@Getter
|
@Getter
|
||||||
|
private volatile TenantId tenantId;
|
||||||
|
@Getter
|
||||||
protected volatile TransportDeviceInfo deviceInfo;
|
protected volatile TransportDeviceInfo deviceInfo;
|
||||||
@Getter
|
@Getter
|
||||||
@Setter
|
@Setter
|
||||||
@ -58,6 +61,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
|
|||||||
public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
|
public void setDeviceInfo(TransportDeviceInfo deviceInfo) {
|
||||||
this.deviceInfo = deviceInfo;
|
this.deviceInfo = deviceInfo;
|
||||||
this.deviceId = deviceInfo.getDeviceId();
|
this.deviceId = deviceInfo.getDeviceId();
|
||||||
|
this.tenantId = deviceInfo.getTenantId();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -65,7 +69,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
|
|||||||
this.sessionInfo = sessionInfo;
|
this.sessionInfo = sessionInfo;
|
||||||
this.deviceProfile = deviceProfile;
|
this.deviceProfile = deviceProfile;
|
||||||
this.deviceInfo.setDeviceType(deviceProfile.getName());
|
this.deviceInfo.setDeviceType(deviceProfile.getName());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user