diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index 4961985be9..c4157c0714 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -17,8 +17,6 @@ package org.thingsboard.server.transport.mqtt.mqttv3.rpc; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.handler.codec.mqtt.MqttQoS; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Value; import org.thingsboard.common.util.JacksonUtil; @@ -52,30 +50,11 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes @Value("${transport.mqtt.msg_queue_size_per_device_limit}") private int maxInflightMessages; - @Before - public void beforeTest() throws Exception { - loginSysAdmin(); - - TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); - Assert.assertNotNull(tenantProfile); - - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); - - profileConfiguration.setTransportGatewayMsgRateLimit(null); - profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null); - profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null); - - profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null); - profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null); - profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null); - - doPost("/api/tenantProfile", tenantProfile); - } - @Test public void getServiceConfigurationRpcForDeviceTest() throws Exception { + loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); profileConfiguration.setTransportDeviceMsgRateLimit("20:600"); profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600"); @@ -102,7 +81,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes .as("await callback").isTrue(); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { + }); assertNotNull(response); assertEquals(response.size(), 6); @@ -118,8 +98,9 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes @Test public void getServiceConfigurationRpcForGatewayTest() throws Exception { + loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); profileConfiguration.setTransportGatewayMsgRateLimit("100:600"); profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600"); @@ -149,7 +130,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { + }); assertNotNull(response); assertEquals(response.size(), 9); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index bbdc3b1e87..151131091b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -499,12 +499,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); + toServerRpcSubTopicType = TopicType.V1; if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) { - toServerRpcSubTopicType = TopicType.V1; onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); } else { transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); - toServerRpcSubTopicType = TopicType.V1; } } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); @@ -1342,11 +1341,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { - TenantId tenantId = TenantId.fromUUID(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); - - var tenantProfile = context.getTenantProfileCache().get(tenantId); - TenantProfileData profileData = tenantProfile.getProfileData(); - DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); + var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); + DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); Map serviceConfiguration = new HashMap<>(); if (sessionInfo.getIsGateway()) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 4c30609e18..1558bbc08b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -40,6 +41,8 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @Getter private volatile DeviceId deviceId; @Getter + private volatile TenantId tenantId; + @Getter protected volatile TransportDeviceInfo deviceInfo; @Getter @Setter @@ -58,6 +61,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public void setDeviceInfo(TransportDeviceInfo deviceInfo) { this.deviceInfo = deviceInfo; this.deviceId = deviceInfo.getDeviceId(); + this.tenantId = deviceInfo.getTenantId(); } @Override @@ -65,7 +69,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext { this.sessionInfo = sessionInfo; this.deviceProfile = deviceProfile; this.deviceInfo.setDeviceType(deviceProfile.getName()); - } @Override