added support of getSerbiceConfiguration RPC
This commit is contained in:
parent
275f9004c5
commit
80d69df5f4
@ -0,0 +1,168 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.TransportPayloadType;
|
||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
|
||||
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
|
||||
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestSubscribeOnTopicCallback;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
|
||||
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
|
||||
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
|
||||
|
||||
@DaoSqlTest
|
||||
public class MqttClientSideRpcIntegrationTest extends AbstractMqttIntegrationTest {
|
||||
|
||||
@Value("${transport.mqtt.netty.max_payload_size}")
|
||||
private Integer maxPayloadSize;
|
||||
|
||||
@Value("${transport.mqtt.msg_queue_size_per_device_limit:100}")
|
||||
private int maxQueueSize;
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
loginSysAdmin();
|
||||
|
||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||
Assert.assertNotNull(tenantProfile);
|
||||
|
||||
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
||||
|
||||
profileConfiguration.setTransportGatewayMsgRateLimit(null);
|
||||
profileConfiguration.setTransportGatewayTelemetryMsgRateLimit(null);
|
||||
profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit(null);
|
||||
|
||||
profileConfiguration.setTransportGatewayDeviceMsgRateLimit(null);
|
||||
profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit(null);
|
||||
profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit(null);
|
||||
|
||||
doPost("/api/tenantProfile", tenantProfile);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getServiceConfigurationRpcForDeviceTest() throws Exception {
|
||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
||||
|
||||
profileConfiguration.setTransportDeviceMsgRateLimit("20:600");
|
||||
profileConfiguration.setTransportDeviceTelemetryMsgRateLimit("10:600");
|
||||
profileConfiguration.setTransportDeviceTelemetryDataPointsRateLimit("40:600");
|
||||
|
||||
doPost("/api/tenantProfile", tenantProfile);
|
||||
|
||||
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
|
||||
.deviceName("Test Get Service Configuration")
|
||||
.transportPayloadType(TransportPayloadType.JSON)
|
||||
.build();
|
||||
processBeforeTest(configProperties);
|
||||
|
||||
MqttTestClient client = new MqttTestClient();
|
||||
client.connectAndWait(accessToken);
|
||||
|
||||
MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1");
|
||||
client.setCallback(callback);
|
||||
client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes());
|
||||
|
||||
assertThat(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS))
|
||||
.as("await callback").isTrue();
|
||||
|
||||
var payload = callback.getPayloadBytes();
|
||||
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {});
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(response.size(), 6);
|
||||
assertEquals(response.get("deviceMsgRateLimit"), profileConfiguration.getTransportDeviceMsgRateLimit());
|
||||
assertEquals(response.get("deviceTelemetryMsgRateLimit"), profileConfiguration.getTransportDeviceTelemetryMsgRateLimit());
|
||||
assertEquals(response.get("deviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportDeviceTelemetryDataPointsRateLimit());
|
||||
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
||||
assertEquals(response.get("maxQueueSize"), maxQueueSize);
|
||||
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
||||
|
||||
client.disconnect();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getServiceConfigurationRpcForGatewayTest() throws Exception {
|
||||
TenantProfile tenantProfile = doGet("/api/tenantProfile/" + tenantProfileId, TenantProfile.class);
|
||||
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration();
|
||||
|
||||
profileConfiguration.setTransportGatewayMsgRateLimit("100:600");
|
||||
profileConfiguration.setTransportGatewayTelemetryMsgRateLimit("50:600");
|
||||
profileConfiguration.setTransportGatewayTelemetryDataPointsRateLimit("200:600");
|
||||
|
||||
profileConfiguration.setTransportGatewayDeviceMsgRateLimit("20:600");
|
||||
profileConfiguration.setTransportGatewayDeviceTelemetryMsgRateLimit("10:600");
|
||||
profileConfiguration.setTransportGatewayDeviceTelemetryDataPointsRateLimit("40:600");
|
||||
|
||||
doPost("/api/tenantProfile", tenantProfile);
|
||||
|
||||
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
|
||||
.gatewayName("Test Get Service Configuration Gateway")
|
||||
.transportPayloadType(TransportPayloadType.JSON)
|
||||
.build();
|
||||
processBeforeTest(configProperties);
|
||||
|
||||
MqttTestClient client = new MqttTestClient();
|
||||
client.connectAndWait(gatewayAccessToken);
|
||||
|
||||
MqttTestCallback callback = new MqttTestSubscribeOnTopicCallback(DEVICE_RPC_RESPONSE_TOPIC + "1");
|
||||
client.setCallback(callback);
|
||||
client.subscribeAndWait(DEVICE_RPC_RESPONSE_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
|
||||
|
||||
client.publishAndWait(DEVICE_RPC_REQUESTS_TOPIC + "1", "{\"method\":\"getServiceConfiguration\",\"params\":{}}".getBytes());
|
||||
|
||||
assertTrue(callback.getSubscribeLatch().await(DEFAULT_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
||||
|
||||
var payload = callback.getPayloadBytes();
|
||||
Map<String, Object> response = JacksonUtil.fromBytes(payload, new TypeReference<>() {});
|
||||
|
||||
assertNotNull(response);
|
||||
assertEquals(response.size(), 9);
|
||||
assertEquals(response.get("gatewayMsgRateLimit"), profileConfiguration.getTransportGatewayMsgRateLimit());
|
||||
assertEquals(response.get("gatewayTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayTelemetryMsgRateLimit());
|
||||
assertEquals(response.get("gatewayTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayTelemetryDataPointsRateLimit());
|
||||
assertEquals(response.get("gatewayDeviceMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceMsgRateLimit());
|
||||
assertEquals(response.get("gatewayDeviceTelemetryMsgRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
||||
assertEquals(response.get("gatewayDeviceTelemetryDataPointsRateLimit"), profileConfiguration.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
||||
assertEquals(response.get("maxPayloadSize"), maxPayloadSize);
|
||||
assertEquals(response.get("maxQueueSize"), maxQueueSize);
|
||||
assertEquals(response.get("payloadType"), TransportPayloadType.JSON.name());
|
||||
|
||||
client.disconnect();
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.transport.TransportContext;
|
||||
import org.thingsboard.server.common.transport.TransportTenantProfileCache;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
|
||||
|
||||
@ -51,6 +52,10 @@ public class MqttTransportContext extends TransportContext {
|
||||
@Autowired
|
||||
private ProtoMqttAdaptor protoMqttAdaptor;
|
||||
|
||||
@Getter
|
||||
@Autowired
|
||||
private TransportTenantProfileCache tenantProfileCache;
|
||||
|
||||
@Getter
|
||||
@Value("${transport.mqtt.netty.max_payload_size}")
|
||||
private Integer maxPayloadSize;
|
||||
|
||||
@ -58,8 +58,11 @@ import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
||||
import org.thingsboard.server.common.msg.EncryptionUtil;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||
@ -96,7 +99,9 @@ import java.security.cert.X509Certificate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -130,6 +135,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
private static final Pattern FW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_FIRMWARE_REQUEST_TOPIC_PATTERN);
|
||||
private static final Pattern SW_REQUEST_PATTERN = Pattern.compile(MqttTopics.DEVICE_SOFTWARE_REQUEST_TOPIC_PATTERN);
|
||||
|
||||
private static final String SERVICE_CONFIGURATION = "getServiceConfiguration";
|
||||
|
||||
private static final String PAYLOAD_TOO_LARGE = "PAYLOAD_TOO_LARGE";
|
||||
|
||||
@ -493,8 +499,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
|
||||
} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
|
||||
TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
|
||||
toServerRpcSubTopicType = TopicType.V1;
|
||||
if (SERVICE_CONFIGURATION.equals(rpcRequestMsg.getMethodName())) {
|
||||
toServerRpcSubTopicType = TopicType.V1;
|
||||
onGetServiceConfigurationRpc(deviceSessionCtx.getSessionInfo(), ctx, msgId, rpcRequestMsg);
|
||||
} else {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
|
||||
toServerRpcSubTopicType = TopicType.V1;
|
||||
}
|
||||
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
|
||||
TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
|
||||
@ -1330,6 +1341,40 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
}
|
||||
}
|
||||
|
||||
private void onGetServiceConfigurationRpc( TransportProtos.SessionInfoProto sessionInfo, ChannelHandlerContext ctx, int msgId, TransportProtos.ToServerRpcRequestMsg rpcRequestMsg) {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
|
||||
|
||||
var tenantProfile = context.getTenantProfileCache().get(tenantId);
|
||||
TenantProfileData profileData = tenantProfile.getProfileData();
|
||||
DefaultTenantProfileConfiguration profile = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
|
||||
Map<String, Object> serviceConfiguration = new HashMap<>();
|
||||
|
||||
if (sessionInfo.getIsGateway()) {
|
||||
serviceConfiguration.put("gatewayMsgRateLimit", profile.getTransportGatewayMsgRateLimit());
|
||||
serviceConfiguration.put("gatewayTelemetryMsgRateLimit", profile.getTransportGatewayTelemetryMsgRateLimit());
|
||||
serviceConfiguration.put("gatewayTelemetryDataPointsRateLimit", profile.getTransportGatewayTelemetryDataPointsRateLimit());
|
||||
serviceConfiguration.put("gatewayDeviceMsgRateLimit", profile.getTransportGatewayDeviceMsgRateLimit());
|
||||
serviceConfiguration.put("gatewayDeviceTelemetryMsgRateLimit", profile.getTransportGatewayDeviceTelemetryMsgRateLimit());
|
||||
serviceConfiguration.put("gatewayDeviceTelemetryDataPointsRateLimit", profile.getTransportGatewayDeviceTelemetryDataPointsRateLimit());
|
||||
} else {
|
||||
serviceConfiguration.put("deviceMsgRateLimit", profile.getTransportDeviceMsgRateLimit());
|
||||
serviceConfiguration.put("deviceTelemetryMsgRateLimit", profile.getTransportDeviceTelemetryMsgRateLimit());
|
||||
serviceConfiguration.put("deviceTelemetryDataPointsRateLimit", profile.getTransportDeviceTelemetryDataPointsRateLimit());
|
||||
}
|
||||
serviceConfiguration.put("maxPayloadSize", context.getMaxPayloadSize());
|
||||
serviceConfiguration.put("maxQueueSize", context.getMessageQueueSizePerDeviceLimit());
|
||||
serviceConfiguration.put("payloadType", deviceSessionCtx.getPayloadType());
|
||||
|
||||
ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS);
|
||||
|
||||
TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder()
|
||||
.setRequestId(rpcRequestMsg.getRequestId())
|
||||
.setPayload(JacksonUtil.toString(serviceConfiguration))
|
||||
.build();
|
||||
|
||||
onToServerRpcResponse(responseMsg);
|
||||
}
|
||||
|
||||
private void handleToSparkplugDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest) throws ThingsboardException {
|
||||
SparkplugMessageType messageType = SparkplugMessageType.parseMessageType(rpcRequest.getMethodName());
|
||||
SparkplugRpcRequestHeader header;
|
||||
|
||||
@ -82,6 +82,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
|
||||
private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
|
||||
private volatile MqttTopicFilter attributesPublishTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
|
||||
private volatile MqttTopicFilter attributesSubscribeTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
|
||||
@Getter
|
||||
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;
|
||||
private volatile Descriptors.Descriptor attributesDynamicMessageDescriptor;
|
||||
private volatile Descriptors.Descriptor telemetryDynamicMessageDescriptor;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user