From 80d69df5f4754a6b2d7db786d83bd4c61d3ba7cc Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 20 Aug 2024 15:01:10 +0200 Subject: [PATCH 1/6] added support of getSerbiceConfiguration RPC --- .../rpc/MqttClientSideRpcIntegrationTest.java | 168 ++++++++++++++++++ .../transport/mqtt/MqttTransportContext.java | 5 + .../transport/mqtt/MqttTransportHandler.java | 49 ++++- .../mqtt/session/DeviceSessionCtx.java | 1 + 4 files changed, 221 insertions(+), 2 deletions(-) create mode 100644 application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java new file mode 100644 index 0000000000..d2a00599bd --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -0,0 +1,168 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import com.fasterxml.jackson.core.type.TypeReference; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; + +@DaoSqlTest +public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTest { + + @Value("${transport.mqtt.netty.max_payload_size}") + private Integer maxPayloadSize; + + @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") + private int maxQueueSize; + + @Before + public void beforeTest() throws Exception { + loginSysAdmin(); + + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + Assert.assertNotNull(tenantProfile); + + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + + profileConfiguration.setTransportGatewayMsgRateLimit(null); + profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null); + profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null); + + profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null); + profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null); + profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null); + + doPost("/api/tenantProfile", tenantProfile); + } + + @Test + public void getServiceConfigurationRpcForDeviceTest() throws Exception { + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + + profileConfiguration.setTransportDeviceMsgRateLimit("20:600"); + profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600"); + profileConfiguration.setTransportDeviceTelemetryDataPointsRateLimit("40:600"); + + doPost("/api/tenantProfile", tenantProfile); + + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Get Service Configuration") + .transportPayloadType(TransportPayloadType.JSON) + .build(); + processBeforeTest(configProperties); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1"); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); + + var payload = callback.getPayloadBytes(); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + + assertNotNull(response); + assertEquals(response.size(), 6); + assertEquals(response.get("deviceMsgRateLimit"), profileConfiguration.getTransportDeviceMsgRateLimit()); + assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); + assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + assertEquals(response.get("maxPayloadSize"), maxPayloadSize); + assertEquals(response.get("maxQueueSize"), maxQueueSize); + assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + + client.disconnect(); + } + + @Test + public void getServiceConfigurationRpcForGatewayTest() throws Exception { + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + + profileConfiguration.setTransportGatewayMsgRateLimit("100:600"); + profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600"); + profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit("200:600"); + + profileConfiguration.setTransportGatewayDeviceMsgRateLimit("20:600"); + profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit("10:600"); + profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit("40:600"); + + doPost("/api/tenantProfile", tenantProfile); + + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .gatewayName("Test Get Service Configuration Gateway") + .transportPayloadType(TransportPayloadType.JSON) + .build(); + processBeforeTest(configProperties); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1"); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + + assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + + var payload = callback.getPayloadBytes(); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + + assertNotNull(response); + assertEquals(response.size(), 9); + assertEquals(response.get("gatewayMsgRateLimit"), profileConfiguration.getTransportGatewayMsgRateLimit()); + assertEquals(response.get("gatewayTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); + assertEquals(response.get("gatewayTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); + assertEquals(response.get("gatewayDeviceMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); + assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); + assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + assertEquals(response.get("maxPayloadSize"), maxPayloadSize); + assertEquals(response.get("maxQueueSize"), maxQueueSize); + assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + + client.disconnect(); + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 16d72ff89f..b8fd79be4b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportContext; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; @@ -51,6 +52,10 @@ public class MqttTransportContext extends TransportContext { @Autowired private ProtoMqttAdaptor protoMqttAdaptor; + @Getter + @Autowired + private TransportTenantProfileCache tenantProfileCache; + @Getter @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index eeeb287dc8..6914044047 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -58,8 +58,11 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; @@ -96,7 +99,9 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; @@ -130,6 +135,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); + private static final String SERVICE_CONFIGURATION = "getServiceConfiguration"; private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; @@ -493,8 +499,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); - toServerRpcSubTopicType = TopicType.V1; + if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) { + toServerRpcSubTopicType = TopicType.V1; + onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); + } else { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + toServerRpcSubTopicType = TopicType.V1; + } } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); @@ -1330,6 +1341,40 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { + TenantId tenantId = TenantId.fromUUID(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + + var tenantProfile = context.getTenantProfileCache().get(tenantId); + TenantProfileData profileData = tenantProfile.getProfileData(); + DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); + Map serviceConfiguration = new HashMap<>(); + + if (sessionInfo.getIsGateway()) { + serviceConfiguration.put("gatewayMsgRateLimit", profile.getTransportGatewayMsgRateLimit()); + serviceConfiguration.put("gatewayTelemetryMsgRateLimit", profile.getTransportGatewayTelemetryMsgRateLimit()); + serviceConfiguration.put("gatewayTelemetryDataPointsRateLimit", profile.getTransportGatewayTelemetryDataPointsRateLimit()); + serviceConfiguration.put("gatewayDeviceMsgRateLimit", profile.getTransportGatewayDeviceMsgRateLimit()); + serviceConfiguration.put("gatewayDeviceTelemetryMsgRateLimit", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); + serviceConfiguration.put("gatewayDeviceTelemetryDataPointsRateLimit", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + } else { + serviceConfiguration.put("deviceMsgRateLimit", profile.getTransportDeviceMsgRateLimit()); + serviceConfiguration.put("deviceTelemetryMsgRateLimit", profile.getTransportDeviceTelemetryMsgRateLimit()); + serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit()); + } + serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize()); + serviceConfiguration.put("maxQueueSize", context.getMessageQueueSizePerDeviceLimit()); + serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType()); + + ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); + + TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() + .setRequestId(rpcRequestMsg.getRequestId()) + .setPayload(JacksonUtil.toString(serviceConfiguration)) + .build(); + + onToServerRpcResponse(responseMsg); + } + private void handleToSparkplugDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws ThingsboardException { SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); SparkplugRpcRequestHeader header; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 6562fa65bf..a3b254c99e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -82,6 +82,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter(); private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); + @Getter private volatile TransportPayloadType payloadType = TransportPayloadType.JSON; private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor; private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor; From b5fff2e44e4a234a9d8767bc3271e0407552f3d7 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 21 Aug 2024 10:27:21 +0200 Subject: [PATCH 2/6] minor refactoring --- .../mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java | 8 ++++---- .../server/transport/mqtt/MqttTransportHandler.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index d2a00599bd..4961985be9 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -49,8 +49,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; - @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") - private int maxQueueSize; + @Value("${transport.mqtt.msg_queue_size_per_device_limit}") + private int maxInflightMessages; @Before public void beforeTest() throws Exception { @@ -110,7 +110,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); assertEquals(response.get("maxPayloadSize"), maxPayloadSize); - assertEquals(response.get("maxQueueSize"), maxQueueSize); + assertEquals(response.get("maxInflightMessages"), maxInflightMessages); assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); client.disconnect(); @@ -160,7 +160,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); assertEquals(response.get("maxPayloadSize"), maxPayloadSize); - assertEquals(response.get("maxQueueSize"), maxQueueSize); + assertEquals(response.get("maxInflightMessages"), maxInflightMessages); assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); client.disconnect(); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 6914044047..bbdc3b1e87 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1362,7 +1362,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit()); } serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize()); - serviceConfiguration.put("maxQueueSize", context.getMessageQueueSizePerDeviceLimit()); + serviceConfiguration.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit()); serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType()); ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); From 805deffaa2ee56f6e9818aae236f8b5c9c1c678b Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 21 Aug 2024 11:39:16 +0200 Subject: [PATCH 3/6] refactored due to comments --- .../rpc/MqttClientSideRpcIntegrationTest.java | 34 +++++-------------- .../transport/mqtt/MqttTransportHandler.java | 10 ++---- .../session/DeviceAwareSessionContext.java | 5 ++- 3 files changed, 15 insertions(+), 34 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index 4961985be9..c4157c0714 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -17,8 +17,6 @@ package org.thingsboard.server.transport.mqtt.mqttv3.rpc; import com.fasterxml.jackson.core.type.TypeReference; import io.netty.handler.codec.mqtt.MqttQoS; -import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Value; import org.thingsboard.common.util.JacksonUtil; @@ -52,30 +50,11 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes @Value("${transport.mqtt.msg_queue_size_per_device_limit}") private int maxInflightMessages; - @Before - public void beforeTest() throws Exception { - loginSysAdmin(); - - TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); - Assert.assertNotNull(tenantProfile); - - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); - - profileConfiguration.setTransportGatewayMsgRateLimit(null); - profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null); - profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null); - - profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null); - profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null); - profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null); - - doPost("/api/tenantProfile", tenantProfile); - } - @Test public void getServiceConfigurationRpcForDeviceTest() throws Exception { + loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); profileConfiguration.setTransportDeviceMsgRateLimit("20:600"); profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600"); @@ -102,7 +81,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes .as("await callback").isTrue(); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { + }); assertNotNull(response); assertEquals(response.size(), 6); @@ -118,8 +98,9 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes @Test public void getServiceConfigurationRpcForGatewayTest() throws Exception { + loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); profileConfiguration.setTransportGatewayMsgRateLimit("100:600"); profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600"); @@ -149,7 +130,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { + }); assertNotNull(response); assertEquals(response.size(), 9); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index bbdc3b1e87..151131091b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -499,12 +499,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); + toServerRpcSubTopicType = TopicType.V1; if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) { - toServerRpcSubTopicType = TopicType.V1; onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); } else { transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); - toServerRpcSubTopicType = TopicType.V1; } } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); @@ -1342,11 +1341,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { - TenantId tenantId = TenantId.fromUUID(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); - - var tenantProfile = context.getTenantProfileCache().get(tenantId); - TenantProfileData profileData = tenantProfile.getProfileData(); - DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); + var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); + DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); Map serviceConfiguration = new HashMap<>(); if (sessionInfo.getIsGateway()) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 4c30609e18..1558bbc08b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -40,6 +41,8 @@ public abstract class DeviceAwareSessionContext implements SessionContext { @Getter private volatile DeviceId deviceId; @Getter + private volatile TenantId tenantId; + @Getter protected volatile TransportDeviceInfo deviceInfo; @Getter @Setter @@ -58,6 +61,7 @@ public abstract class DeviceAwareSessionContext implements SessionContext { public void setDeviceInfo(TransportDeviceInfo deviceInfo) { this.deviceInfo = deviceInfo; this.deviceId = deviceInfo.getDeviceId(); + this.tenantId = deviceInfo.getTenantId(); } @Override @@ -65,7 +69,6 @@ public abstract class DeviceAwareSessionContext implements SessionContext { this.sessionInfo = sessionInfo; this.deviceProfile = deviceProfile; this.deviceInfo.setDeviceType(deviceProfile.getName()); - } @Override From dd3ed2a496069b02ef218ffd36d96bdb984810b3 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 2 Sep 2024 13:11:06 +0200 Subject: [PATCH 4/6] rafactoring due to comments --- .../rpc/MqttClientSideRpcIntegrationTest.java | 40 ++++++++++--------- .../transport/mqtt/MqttTransportHandler.java | 40 +++++++++---------- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index c4157c0714..6c08b61f09 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -51,7 +51,7 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes private int maxInflightMessages; @Test - public void getServiceConfigurationRpcForDeviceTest() throws Exception { + public void getSessionLimitsRpcForDeviceTest() throws Exception { loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); @@ -75,29 +75,30 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes client.setCallback(callback); client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); - client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes()); assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) .as("await callback").isTrue(); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { - }); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); assertNotNull(response); - assertEquals(response.size(), 6); - assertEquals(response.get("deviceMsgRateLimit"), profileConfiguration.getTransportDeviceMsgRateLimit()); - assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); - assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + assertEquals(4, response.size()); assertEquals(response.get("maxPayloadSize"), maxPayloadSize); assertEquals(response.get("maxInflightMessages"), maxInflightMessages); assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + Map rateLimits = (Map) response.get("rateLimits"); + assertEquals(3, rateLimits.size()); + assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportDeviceMsgRateLimit()); + assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); + assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); client.disconnect(); } @Test - public void getServiceConfigurationRpcForGatewayTest() throws Exception { + public void getSessionLimitsRpcForGatewayTest() throws Exception { loginSysAdmin(); TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); DefaultTenantProfileConfiguration profileConfiguration = tenantProfile.getDefaultProfileConfiguration(); @@ -125,25 +126,26 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes client.setCallback(callback); client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); - client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getSessionLimits\",\"params\":{}}".getBytes()); assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() { - }); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); assertNotNull(response); - assertEquals(response.size(), 9); - assertEquals(response.get("gatewayMsgRateLimit"), profileConfiguration.getTransportGatewayMsgRateLimit()); - assertEquals(response.get("gatewayTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); - assertEquals(response.get("gatewayTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); - assertEquals(response.get("gatewayDeviceMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); - assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); - assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + assertEquals(4, response.size()); assertEquals(response.get("maxPayloadSize"), maxPayloadSize); assertEquals(response.get("maxInflightMessages"), maxInflightMessages); assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + Map rateLimits = (Map) response.get("rateLimits"); + assertEquals(6, rateLimits.size()); + assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportGatewayMsgRateLimit()); + assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); + assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); + assertEquals(rateLimits.get("deviceMessages"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); + assertEquals(rateLimits.get("deviceTelemetryMessages"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); + assertEquals(rateLimits.get("deviceTelemetryDataPoints"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); client.disconnect(); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 151131091b..0fce2048a4 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -58,11 +58,9 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; @@ -135,7 +133,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); - private static final String SERVICE_CONFIGURATION = "getServiceConfiguration"; + private static final String SESSION_LIMITS = "getSessionLimits"; private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; @@ -500,8 +498,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); toServerRpcSubTopicType = TopicType.V1; - if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) { - onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); + if (SESSION_LIMITS.equals(rpcRequestMsg.getMethodName())) { + onGetSessionLimitsRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); } else { transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); } @@ -1340,32 +1338,34 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { + private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); - Map serviceConfiguration = new HashMap<>(); + Map sessionLimits = new HashMap<>(); + Map rateLimits = new HashMap<>(); if (sessionInfo.getIsGateway()) { - serviceConfiguration.put("gatewayMsgRateLimit", profile.getTransportGatewayMsgRateLimit()); - serviceConfiguration.put("gatewayTelemetryMsgRateLimit", profile.getTransportGatewayTelemetryMsgRateLimit()); - serviceConfiguration.put("gatewayTelemetryDataPointsRateLimit", profile.getTransportGatewayTelemetryDataPointsRateLimit()); - serviceConfiguration.put("gatewayDeviceMsgRateLimit", profile.getTransportGatewayDeviceMsgRateLimit()); - serviceConfiguration.put("gatewayDeviceTelemetryMsgRateLimit", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); - serviceConfiguration.put("gatewayDeviceTelemetryDataPointsRateLimit", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + rateLimits.put("messages", profile.getTransportGatewayMsgRateLimit()); + rateLimits.put("telemetryMessages", profile.getTransportGatewayTelemetryMsgRateLimit()); + rateLimits.put("telemetryDataPoints", profile.getTransportGatewayTelemetryDataPointsRateLimit()); + rateLimits.put("deviceMessages", profile.getTransportGatewayDeviceMsgRateLimit()); + rateLimits.put("deviceTelemetryMessages", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); + rateLimits.put("deviceTelemetryDataPoints", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); } else { - serviceConfiguration.put("deviceMsgRateLimit", profile.getTransportDeviceMsgRateLimit()); - serviceConfiguration.put("deviceTelemetryMsgRateLimit", profile.getTransportDeviceTelemetryMsgRateLimit()); - serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit()); + rateLimits.put("messages", profile.getTransportDeviceMsgRateLimit()); + rateLimits.put("telemetryMessages", profile.getTransportDeviceTelemetryMsgRateLimit()); + rateLimits.put("telemetryDataPoints", profile.getTransportDeviceTelemetryDataPointsRateLimit()); } - serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize()); - serviceConfiguration.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit()); - serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType()); + sessionLimits.put("rateLimits", rateLimits); + sessionLimits.put("maxPayloadSize", context.getMaxPayloadSize()); + sessionLimits.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit()); + sessionLimits.put("payloadType", deviceSessionCtx.getPayloadType()); ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() .setRequestId(rpcRequestMsg.getRequestId()) - .setPayload(JacksonUtil.toString(serviceConfiguration)) + .setPayload(JacksonUtil.toString(sessionLimits)) .build(); onToServerRpcResponse(responseMsg); From aab1161c796d55b369b1f82a5376a61d0faf59c8 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 3 Sep 2024 10:58:40 +0200 Subject: [PATCH 5/6] create pojo for session limits --- .../rpc/MqttClientSideRpcIntegrationTest.java | 58 +++++++++---------- .../transport/mqtt/MqttTransportHandler.java | 41 +++++++------ .../mqtt/limits/GatewaySessionLimits.java | 25 ++++++++ .../transport/mqtt/limits/SessionLimits.java | 30 ++++++++++ 4 files changed, 106 insertions(+), 48 deletions(-) create mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java create mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index 6c08b61f09..2a700e484d 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.transport.mqtt.mqttv3.rpc; -import com.fasterxml.jackson.core.type.TypeReference; import io.netty.handler.codec.mqtt.MqttQoS; import org.junit.Test; import org.springframework.beans.factory.annotation.Value; @@ -26,16 +25,16 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import org.thingsboard.server.transport.mqtt.limits.GatewaySessionLimits; +import org.thingsboard.server.transport.mqtt.limits.SessionLimits; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; -import java.util.Map; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC; @@ -62,6 +61,15 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes doPost("/api/tenantProfile", tenantProfile); + var expectedLimits = new SessionLimits(); + var deviceLimits = new SessionLimits.SessionRateLimits(profileConfiguration.getTransportDeviceMsgRateLimit(), + profileConfiguration.getTransportDeviceTelemetryMsgRateLimit(), + profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + expectedLimits.setRateLimits(deviceLimits); + expectedLimits.setMaxPayloadSize(maxPayloadSize); + expectedLimits.setMaxInflightMessages(maxInflightMessages); + expectedLimits.setPayloadType(TransportPayloadType.JSON); + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .deviceName("Test Get Service Configuration") .transportPayloadType(TransportPayloadType.JSON) @@ -81,18 +89,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes .as("await callback").isTrue(); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); - - assertNotNull(response); - assertEquals(4, response.size()); - assertEquals(response.get("maxPayloadSize"), maxPayloadSize); - assertEquals(response.get("maxInflightMessages"), maxInflightMessages); - assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); - Map rateLimits = (Map) response.get("rateLimits"); - assertEquals(3, rateLimits.size()); - assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportDeviceMsgRateLimit()); - assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); - assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + SessionLimits actualLimits = JacksonUtil.fromBytes(payload, SessionLimits.class); + assertEquals(expectedLimits, actualLimits); client.disconnect(); } @@ -113,6 +111,19 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes doPost("/api/tenantProfile", tenantProfile); + var expectedLimits = new GatewaySessionLimits(); + var gatewayLimits = new SessionLimits.SessionRateLimits(profileConfiguration.getTransportGatewayMsgRateLimit(), + profileConfiguration.getTransportGatewayTelemetryMsgRateLimit(), + profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); + var gatewayDeviceLimits = new SessionLimits.SessionRateLimits(profileConfiguration.getTransportGatewayDeviceMsgRateLimit(), + profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit(), + profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + expectedLimits.setGatewayRateLimits(gatewayLimits); + expectedLimits.setRateLimits(gatewayDeviceLimits); + expectedLimits.setMaxPayloadSize(maxPayloadSize); + expectedLimits.setMaxInflightMessages(maxInflightMessages); + expectedLimits.setPayloadType(TransportPayloadType.JSON); + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .gatewayName("Test Get Service Configuration Gateway") .transportPayloadType(TransportPayloadType.JSON) @@ -131,21 +142,8 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); var payload = callback.getPayloadBytes(); - Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); - - assertNotNull(response); - assertEquals(4, response.size()); - assertEquals(response.get("maxPayloadSize"), maxPayloadSize); - assertEquals(response.get("maxInflightMessages"), maxInflightMessages); - assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); - Map rateLimits = (Map) response.get("rateLimits"); - assertEquals(6, rateLimits.size()); - assertEquals(rateLimits.get("messages"), profileConfiguration.getTransportGatewayMsgRateLimit()); - assertEquals(rateLimits.get("telemetryMessages"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); - assertEquals(rateLimits.get("telemetryDataPoints"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); - assertEquals(rateLimits.get("deviceMessages"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); - assertEquals(rateLimits.get("deviceTelemetryMessages"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); - assertEquals(rateLimits.get("deviceTelemetryDataPoints"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + SessionLimits actualLimits = JacksonUtil.fromBytes(payload, GatewaySessionLimits.class); + assertEquals(expectedLimits, actualLimits); client.disconnect(); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 0fce2048a4..823e558370 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -79,6 +79,8 @@ import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; +import org.thingsboard.server.transport.mqtt.limits.GatewaySessionLimits; +import org.thingsboard.server.transport.mqtt.limits.SessionLimits; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; @@ -97,9 +99,7 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; @@ -936,7 +936,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } else { log.debug("[{}] Failed to process unsubscription [{}] to [{}] - Subscription not found", sessionId, mqttMsg.variableHeader().messageId(), topicName); - unSubResults.add((short)MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue()); + unSubResults.add((short) MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue()); } } if (!activityReported) { @@ -1341,25 +1341,30 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void onGetSessionLimitsRpc(TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { var tenantProfile = context.getTenantProfileCache().get(deviceSessionCtx.getTenantId()); DefaultTenantProfileConfiguration profile = tenantProfile.getDefaultProfileConfiguration(); - Map sessionLimits = new HashMap<>(); - Map rateLimits = new HashMap<>(); + + SessionLimits sessionLimits; if (sessionInfo.getIsGateway()) { - rateLimits.put("messages", profile.getTransportGatewayMsgRateLimit()); - rateLimits.put("telemetryMessages", profile.getTransportGatewayTelemetryMsgRateLimit()); - rateLimits.put("telemetryDataPoints", profile.getTransportGatewayTelemetryDataPointsRateLimit()); - rateLimits.put("deviceMessages", profile.getTransportGatewayDeviceMsgRateLimit()); - rateLimits.put("deviceTelemetryMessages", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); - rateLimits.put("deviceTelemetryDataPoints", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + var gatewaySessionLimits = new GatewaySessionLimits(); + var gatewayLimits = new SessionLimits.SessionRateLimits(profile.getTransportGatewayMsgRateLimit(), + profile.getTransportGatewayTelemetryMsgRateLimit(), + profile.getTransportGatewayTelemetryDataPointsRateLimit()); + var gatewayDeviceLimits = new SessionLimits.SessionRateLimits(profile.getTransportGatewayDeviceMsgRateLimit(), + profile.getTransportGatewayDeviceTelemetryMsgRateLimit(), + profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + gatewaySessionLimits.setGatewayRateLimits(gatewayLimits); + gatewaySessionLimits.setRateLimits(gatewayDeviceLimits); + sessionLimits = gatewaySessionLimits; } else { - rateLimits.put("messages", profile.getTransportDeviceMsgRateLimit()); - rateLimits.put("telemetryMessages", profile.getTransportDeviceTelemetryMsgRateLimit()); - rateLimits.put("telemetryDataPoints", profile.getTransportDeviceTelemetryDataPointsRateLimit()); + var rateLimits = new SessionLimits.SessionRateLimits(profile.getTransportDeviceMsgRateLimit(), + profile.getTransportDeviceTelemetryMsgRateLimit(), + profile.getTransportDeviceTelemetryDataPointsRateLimit()); + sessionLimits = new SessionLimits(); + sessionLimits.setRateLimits(rateLimits); } - sessionLimits.put("rateLimits", rateLimits); - sessionLimits.put("maxPayloadSize", context.getMaxPayloadSize()); - sessionLimits.put("maxInflightMessages", context.getMessageQueueSizePerDeviceLimit()); - sessionLimits.put("payloadType", deviceSessionCtx.getPayloadType()); + sessionLimits.setMaxPayloadSize(context.getMaxPayloadSize()); + sessionLimits.setMaxInflightMessages(context.getMessageQueueSizePerDeviceLimit()); + sessionLimits.setPayloadType(deviceSessionCtx.getPayloadType()); ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java new file mode 100644 index 0000000000..6b59f6bbbd --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/GatewaySessionLimits.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.limits; + +import lombok.Data; + +@Data +public class GatewaySessionLimits extends SessionLimits { + + private SessionRateLimits gatewayRateLimits; + +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java new file mode 100644 index 0000000000..7651d27386 --- /dev/null +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.limits; + +import lombok.Data; +import org.thingsboard.server.common.data.TransportPayloadType; + +@Data +public class SessionLimits { + + private int maxPayloadSize; + private int maxInflightMessages; + private TransportPayloadType payloadType; + private SessionRateLimits rateLimits; + + public record SessionRateLimits(String messages, String telemetryMessages, String telemetryDataPoints) {} +} From 387a7eadcfd622ef635cb97544c577e20209f473 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 5 Sep 2024 20:00:43 +0200 Subject: [PATCH 6/6] removed payloadType from SessionLimits --- .../mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java | 2 -- .../thingsboard/server/transport/mqtt/MqttTransportHandler.java | 1 - .../thingsboard/server/transport/mqtt/limits/SessionLimits.java | 1 - 3 files changed, 4 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java index 2a700e484d..6489054ba9 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -68,7 +68,6 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes expectedLimits.setRateLimits(deviceLimits); expectedLimits.setMaxPayloadSize(maxPayloadSize); expectedLimits.setMaxInflightMessages(maxInflightMessages); - expectedLimits.setPayloadType(TransportPayloadType.JSON); MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .deviceName("Test Get Service Configuration") @@ -122,7 +121,6 @@ public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTes expectedLimits.setRateLimits(gatewayDeviceLimits); expectedLimits.setMaxPayloadSize(maxPayloadSize); expectedLimits.setMaxInflightMessages(maxInflightMessages); - expectedLimits.setPayloadType(TransportPayloadType.JSON); MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() .gatewayName("Test Get Service Configuration Gateway") diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 823e558370..4212386410 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1364,7 +1364,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } sessionLimits.setMaxPayloadSize(context.getMaxPayloadSize()); sessionLimits.setMaxInflightMessages(context.getMessageQueueSizePerDeviceLimit()); - sessionLimits.setPayloadType(deviceSessionCtx.getPayloadType()); ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java index 7651d27386..b607bfadfc 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/limits/SessionLimits.java @@ -23,7 +23,6 @@ public class SessionLimits { private int maxPayloadSize; private int maxInflightMessages; - private TransportPayloadType payloadType; private SessionRateLimits rateLimits; public record SessionRateLimits(String messages, String telemetryMessages, String telemetryDataPoints) {}