diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java new file mode 100644 index 0000000000..d2a00599bd --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/rpc/MqttClientSideRpcIntegrationTest.java @@ -0,0 +1,168 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt.mqttv3.rpc; + +import com.fasterxml.jackson.core.type.TypeReference; +import io.netty.handler.codec.mqtt.MqttQoS; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.TransportPayloadType; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; +import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC; +import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; + +@DaoSqlTest +public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTest { + + @Value("${transport.mqtt.netty.max_payload_size}") + private Integer maxPayloadSize; + + @Value("${transport.mqtt.msg_queue_size_per_device_limit:100}") + private int maxQueueSize; + + @Before + public void beforeTest() throws Exception { + loginSysAdmin(); + + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + Assert.assertNotNull(tenantProfile); + + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + + profileConfiguration.setTransportGatewayMsgRateLimit(null); + profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null); + profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null); + + profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null); + profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null); + profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null); + + doPost("/api/tenantProfile", tenantProfile); + } + + @Test + public void getServiceConfigurationRpcForDeviceTest() throws Exception { + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + + profileConfiguration.setTransportDeviceMsgRateLimit("20:600"); + profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600"); + profileConfiguration.setTransportDeviceTelemetryDataPointsRateLimit("40:600"); + + doPost("/api/tenantProfile", tenantProfile); + + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .deviceName("Test Get Service Configuration") + .transportPayloadType(TransportPayloadType.JSON) + .build(); + processBeforeTest(configProperties); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(accessToken); + + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1"); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + + assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .as("await callback").isTrue(); + + var payload = callback.getPayloadBytes(); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + + assertNotNull(response); + assertEquals(response.size(), 6); + assertEquals(response.get("deviceMsgRateLimit"), profileConfiguration.getTransportDeviceMsgRateLimit()); + assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit()); + assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit()); + assertEquals(response.get("maxPayloadSize"), maxPayloadSize); + assertEquals(response.get("maxQueueSize"), maxQueueSize); + assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + + client.disconnect(); + } + + @Test + public void getServiceConfigurationRpcForGatewayTest() throws Exception { + TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class); + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + + profileConfiguration.setTransportGatewayMsgRateLimit("100:600"); + profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600"); + profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit("200:600"); + + profileConfiguration.setTransportGatewayDeviceMsgRateLimit("20:600"); + profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit("10:600"); + profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit("40:600"); + + doPost("/api/tenantProfile", tenantProfile); + + MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() + .gatewayName("Test Get Service Configuration Gateway") + .transportPayloadType(TransportPayloadType.JSON) + .build(); + processBeforeTest(configProperties); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(gatewayAccessToken); + + MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1"); + client.setCallback(callback); + client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE); + + client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes()); + + assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)); + + var payload = callback.getPayloadBytes(); + Map response = JacksonUtil.fromBytes(payload, new TypeReference<>() {}); + + assertNotNull(response); + assertEquals(response.size(), 9); + assertEquals(response.get("gatewayMsgRateLimit"), profileConfiguration.getTransportGatewayMsgRateLimit()); + assertEquals(response.get("gatewayTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit()); + assertEquals(response.get("gatewayTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit()); + assertEquals(response.get("gatewayDeviceMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit()); + assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit()); + assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + assertEquals(response.get("maxPayloadSize"), maxPayloadSize); + assertEquals(response.get("maxQueueSize"), maxQueueSize); + assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name()); + + client.disconnect(); + } +} diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 16d72ff89f..b8fd79be4b 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportContext; +import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; @@ -51,6 +52,10 @@ public class MqttTransportContext extends TransportContext { @Autowired private ProtoMqttAdaptor protoMqttAdaptor; + @Getter + @Autowired + private TransportTenantProfileCache tenantProfileCache; + @Getter @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index eeeb287dc8..6914044047 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -58,8 +58,11 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; +import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; @@ -96,7 +99,9 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; @@ -130,6 +135,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN); private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN); + private static final String SERVICE_CONFIGURATION = "getServiceConfiguration"; private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE"; @@ -493,8 +499,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); - transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); - toServerRpcSubTopicType = TopicType.V1; + if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) { + toServerRpcSubTopicType = TopicType.V1; + onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg); + } else { + transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); + toServerRpcSubTopicType = TopicType.V1; + } } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); @@ -1330,6 +1341,40 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) { + TenantId tenantId = TenantId.fromUUID(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + + var tenantProfile = context.getTenantProfileCache().get(tenantId); + TenantProfileData profileData = tenantProfile.getProfileData(); + DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); + Map serviceConfiguration = new HashMap<>(); + + if (sessionInfo.getIsGateway()) { + serviceConfiguration.put("gatewayMsgRateLimit", profile.getTransportGatewayMsgRateLimit()); + serviceConfiguration.put("gatewayTelemetryMsgRateLimit", profile.getTransportGatewayTelemetryMsgRateLimit()); + serviceConfiguration.put("gatewayTelemetryDataPointsRateLimit", profile.getTransportGatewayTelemetryDataPointsRateLimit()); + serviceConfiguration.put("gatewayDeviceMsgRateLimit", profile.getTransportGatewayDeviceMsgRateLimit()); + serviceConfiguration.put("gatewayDeviceTelemetryMsgRateLimit", profile.getTransportGatewayDeviceTelemetryMsgRateLimit()); + serviceConfiguration.put("gatewayDeviceTelemetryDataPointsRateLimit", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit()); + } else { + serviceConfiguration.put("deviceMsgRateLimit", profile.getTransportDeviceMsgRateLimit()); + serviceConfiguration.put("deviceTelemetryMsgRateLimit", profile.getTransportDeviceTelemetryMsgRateLimit()); + serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit()); + } + serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize()); + serviceConfiguration.put("maxQueueSize", context.getMessageQueueSizePerDeviceLimit()); + serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType()); + + ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); + + TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder() + .setRequestId(rpcRequestMsg.getRequestId()) + .setPayload(JacksonUtil.toString(serviceConfiguration)) + .build(); + + onToServerRpcResponse(responseMsg); + } + private void handleToSparkplugDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws ThingsboardException { SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName()); SparkplugRpcRequestHeader header; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 6562fa65bf..a3b254c99e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -82,6 +82,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter(); private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter(); + @Getter private volatile TransportPayloadType payloadType = TransportPayloadType.JSON; private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor; private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;