Merge pull request #7596 from imbeacon/mqtt5-codes

[3.5] Added Mqtt v5 reason codes for ack messages
This commit is contained in:
Andrew Shvayka 2022-12-08 14:40:14 +02:00 committed by GitHub
commit f813073bfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 2102 additions and 167 deletions

View File

@ -140,6 +140,10 @@
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>

View File

@ -57,8 +57,8 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import java.util.List;
import java.util.Map;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt;
package org.thingsboard.server.transport.mqtt.mqttv3;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt;
package org.thingsboard.server.transport.mqtt.mqttv3;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@ -104,6 +104,10 @@ public class MqttTestClient {
return client.subscribe(topic, qoS.value());
}
public boolean isConnected() {
return client.isConnected();
}
public void enableManualAcks() {
client.setManualAcks(true);
}
@ -112,6 +116,10 @@ public class MqttTestClient {
client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
}
private MqttAsyncClient createClient() throws MqttException {
return createClient(null);
}
private MqttAsyncClient createClient(String clientId) throws MqttException {
if (StringUtils.isEmpty(clientId)) {
clientId = MqttAsyncClient.generateClientId();
@ -119,8 +127,4 @@ public class MqttTestClient {
return new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence());
}
private MqttAsyncClient createClient() throws MqttException {
return createClient(null);
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.protobuf.Descriptors;
@ -22,7 +22,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.squareup.wire.schema.internal.parser.ProtoFileElement;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DynamicProtoUtils;
@ -41,8 +40,8 @@ import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import java.util.ArrayList;
import java.util.List;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.request;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.request;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
import java.util.ArrayList;
import java.util.List;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.request;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.request;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -21,7 +21,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
@Slf4j
@DaoSqlTest

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.request;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.request;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
@Slf4j
@DaoSqlTest

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.request;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.request;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
import java.util.ArrayList;
import java.util.List;

View File

@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.updates;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.updates;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -21,7 +21,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.updates;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -21,7 +21,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.attributes.updates;
package org.thingsboard.server.transport.mqtt.mqttv3.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -21,7 +21,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.attributes.AbstractMqttAttributesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.attributes.AbstractMqttAttributesIntegrationTest;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.claim;
package org.thingsboard.server.transport.mqtt.mqttv3.claim;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.claim;
package org.thingsboard.server.transport.mqtt.mqttv3.claim;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -28,7 +28,7 @@ import org.thingsboard.server.dao.device.claim.ClaimResult;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import static org.junit.Assert.assertEquals;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.claim;
package org.thingsboard.server.transport.mqtt.mqttv3.claim;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.claim;
package org.thingsboard.server.transport.mqtt.mqttv3.claim;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
@ -21,7 +21,7 @@ import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@Slf4j

View File

@ -0,0 +1,50 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv3.client;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Assert;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
public abstract class AbstractMqttClientConnectionTest extends AbstractMqttIntegrationTest {
protected void processClientWithCorrectAccessTokenTest() throws Exception {
MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken);
Assert.assertTrue(client.isConnected());
client.disconnect();
}
protected void processClientWithWrongAccessTokenTest() throws Exception {
MqttTestClient client = new MqttTestClient();
try {
client.connectAndWait("wrongAccessToken");
} catch (MqttException e) {
Assert.assertEquals(MqttException.REASON_CODE_FAILED_AUTHENTICATION, e.getReasonCode());
}
}
protected void processClientWithWrongClientIdAndEmptyUsernamePasswordTest() throws Exception {
MqttTestClient client = new MqttTestClient("unknownClientId");
try {
client.connectAndWait();
} catch (MqttException e) {
Assert.assertEquals(MqttException.REASON_CODE_INVALID_CLIENT_ID, e.getReasonCode());
}
}
}

View File

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv3.client;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttClientConnectionTest extends AbstractMqttClientConnectionTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test MqttV5 client device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testClientWithCorrectAccessToken() throws Exception {
processClientWithCorrectAccessTokenTest();
}
@Test
public void testClientWithWrongAccessToken() throws Exception {
processClientWithWrongAccessTokenTest();
}
@Test
public void testClientWithWrongClientIdAndEmptyUsernamePassword() throws Exception {
processClientWithWrongClientIdAndEmptyUsernamePasswordTest();
}
}

View File

@ -13,10 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.credentials;
package org.thingsboard.server.transport.mqtt.mqttv3.credentials;
import com.fasterxml.jackson.core.type.TypeReference;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
@ -27,7 +28,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import java.util.Arrays;
import java.util.HashSet;
@ -114,11 +115,16 @@ public class BasicMqttCredentialsTest extends AbstractMqttIntegrationTest {
testTelemetryIsDelivered(accessToken2Device, mqttTestClient5);
}
@Test(expected = MqttSecurityException.class)
@Test(expected = MqttException.class)
public void testCorrectClientIdAndUserNameButWrongPassword() throws Exception {
// Not correct. Correct clientId and username, but wrong password
MqttTestClient mqttTestClient = new MqttTestClient(CLIENT_ID);
mqttTestClient.connectAndWait(USER_NAME3, "WRONG PASSWORD");
try {
mqttTestClient.connectAndWait(USER_NAME3, "WRONG PASSWORD");
Assert.fail(); // This should not happens, because we have a wrong password
} catch (MqttException e) {
Assert.assertEquals(4, e.getReasonCode()); // 4 - Reason code for bad username or password in MQTT v3
}
testTelemetryIsNotDelivered(clientIdAndUserNameAndPasswordDevice3, mqttTestClient);
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.provision;
package org.thingsboard.server.transport.mqtt.mqttv3.provision;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.handler.codec.mqtt.MqttQoS;
@ -33,8 +33,8 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.device.provision.ProvisionResponseStatus;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.concurrent.TimeUnit;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.provision;
package org.thingsboard.server.transport.mqtt.mqttv3.provision;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
@ -41,8 +41,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCre
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.concurrent.TimeUnit;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.rpc;
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -39,8 +39,8 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import java.util.ArrayList;
import java.util.List;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.rpc;
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.rpc;
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.extern.slf4j.Slf4j;

View File

@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.rpc;
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.rpc;
package org.thingsboard.server.transport.mqtt.mqttv3.rpc;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.attributes;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.attributes;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@ -24,7 +24,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.attributes;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.attributes;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.attributes;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.attributes;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.protobuf.Descriptors;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.codec.mqtt.MqttQoS;
@ -23,8 +23,8 @@ import org.junit.Test;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;

View File

@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries;
import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.protobuf.Descriptors;
@ -31,8 +31,8 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.MqttTestClient;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import java.util.Arrays;

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries.nosql;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.nosql;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.transport.mqtt.telemetry.timeseries.AbstractMqttTimeseriesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.AbstractMqttTimeseriesIntegrationTest;
@DaoNoSqlTest
public class MqttTimeseriesNoSqlIntegrationTest extends AbstractMqttTimeseriesIntegrationTest {

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries.nosql;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.nosql;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.transport.mqtt.telemetry.timeseries.AbstractMqttTimeseriesJsonIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.AbstractMqttTimeseriesJsonIntegrationTest;
@DaoNoSqlTest
public class MqttTimeseriesNoSqlJsonIntegrationTest extends AbstractMqttTimeseriesJsonIntegrationTest {

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries.nosql;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.nosql;
import org.thingsboard.server.dao.service.DaoNoSqlTest;
import org.thingsboard.server.transport.mqtt.telemetry.timeseries.AbstractMqttTimeseriesProtoIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.AbstractMqttTimeseriesProtoIntegrationTest;
@DaoNoSqlTest
public class MqttTimeseriesNoSqlProtoIntegrationTest extends AbstractMqttTimeseriesProtoIntegrationTest {

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries.sql;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.sql;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.telemetry.timeseries.AbstractMqttTimeseriesIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.AbstractMqttTimeseriesIntegrationTest;
@DaoSqlTest
public class MqttTimeseriesSqlIntegrationTest extends AbstractMqttTimeseriesIntegrationTest {

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries.sql;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.sql;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.telemetry.timeseries.AbstractMqttTimeseriesJsonIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.AbstractMqttTimeseriesJsonIntegrationTest;
@DaoSqlTest
public class MqttTimeseriesSqlJsonIntegrationTest extends AbstractMqttTimeseriesJsonIntegrationTest {

View File

@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.telemetry.timeseries.sql;
package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.sql;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.telemetry.timeseries.AbstractMqttTimeseriesProtoIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries.AbstractMqttTimeseriesProtoIntegrationTest;
@DaoSqlTest
public class MqttTimeseriesSqlProtoIntegrationTest extends AbstractMqttTimeseriesProtoIntegrationTest {

View File

@ -0,0 +1,21 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
public abstract class AbstractMqttV5Test extends AbstractMqttIntegrationTest {
}

View File

@ -0,0 +1,110 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Data
public class MqttV5TestCallback implements MqttCallback {
protected CountDownLatch subscribeLatch;
protected final CountDownLatch deliveryLatch;
protected int qoS;
protected byte[] payloadBytes;
protected String awaitSubTopic;
protected boolean pubAckReceived;
protected MqttMessage lastReceivedMessage;
public MqttV5TestCallback() {
this.subscribeLatch = new CountDownLatch(1);
this.deliveryLatch = new CountDownLatch(1);
}
public MqttV5TestCallback(int subscribeCount) {
this.subscribeLatch = new CountDownLatch(subscribeCount);
this.deliveryLatch = new CountDownLatch(1);
}
public MqttV5TestCallback(String awaitSubTopic) {
this.subscribeLatch = new CountDownLatch(1);
this.deliveryLatch = new CountDownLatch(1);
this.awaitSubTopic = awaitSubTopic;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
if (mqttDisconnectResponse.getException() != null) {
log.warn("connectionLost: ", mqttDisconnectResponse.getException());
deliveryLatch.countDown();
}
log.warn("Disconnected with reason: {}", mqttDisconnectResponse.getReasonString());
}
@Override
public void mqttErrorOccurred(MqttException e) {
log.warn("Error occurred:", e);
}
@Override
public void messageArrived(String requestTopic, MqttMessage mqttMessage) {
lastReceivedMessage = mqttMessage;
if (awaitSubTopic == null) {
log.warn("messageArrived on topic: {}", requestTopic);
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
} else {
messageArrivedOnAwaitSubTopic(requestTopic, mqttMessage);
}
}
protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
subscribeLatch.countDown();
}
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
log.warn("delivery complete: {}", iMqttToken.getResponse());
pubAckReceived = iMqttToken.getResponse().getType() == MqttWireMessage.MESSAGE_TYPE_PUBACK;
deliveryLatch.countDown();
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.warn("Connect completed: reconnect - {}, serverURI - {}", reconnect, serverURI);
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties mqttProperties) {
log.warn("Auth package received: reasonCode - {}, mqtt properties - {}", reasonCode, mqttProperties);
}
}

View File

@ -0,0 +1,175 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.thingsboard.server.common.data.StringUtils;
import java.util.concurrent.TimeUnit;
public class MqttV5TestClient { // We should copy part of MqttV3TestClient, due to different package names in import
private static final String MQTT_URL = "tcp://localhost:1883";
private static final int TIMEOUT = 30; // seconds
private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(TIMEOUT);
private final MqttAsyncClient client;
public void setCallback(MqttCallback callback) {
client.setCallback(callback);
}
public MqttV5TestClient() throws MqttException {
this.client = createClient();
}
public MqttV5TestClient(String clientId) throws MqttException {
this.client = createClient(clientId);
}
public MqttV5TestClient(boolean generateClientId) throws MqttException {
this.client = createClient(generateClientId);
}
public IMqttToken connectAndWait(String userName, String password) throws MqttException {
IMqttToken connect = connect(userName, password);
connect.waitForCompletion(TIMEOUT_MS);
return connect;
}
public IMqttToken connectAndWait(String userName) throws MqttException {
return connectAndWait(userName, null);
}
public IMqttToken connectAndWait() throws MqttException {
return connectAndWait(null, null);
}
public IMqttToken connectAndWait(MqttConnectionOptions options) throws MqttException {
IMqttToken iMqttToken = connect(options);
iMqttToken.waitForCompletion(TIMEOUT_MS);
return iMqttToken;
}
private IMqttToken connect(String userName, String password) throws MqttException {
if (client == null) {
throw new RuntimeException("Failed to connect! MqttAsyncClient is not initialized!");
}
MqttConnectionOptions options = new MqttConnectionOptions();
if (StringUtils.isNotEmpty(userName)) {
options.setUserName(userName);
}
if (StringUtils.isNotEmpty(password)) {
options.setPassword(password.getBytes());
}
return client.connect(options);
}
public IMqttToken connect(MqttConnectionOptions options) throws MqttException {
if (client == null) {
throw new RuntimeException("Failed to connect! MqttAsyncClient is not initialized!");
}
return client.connect(options);
}
public void disconnectAndWait() throws MqttException {
disconnect().waitForCompletion(TIMEOUT_MS);
}
public IMqttToken disconnect() throws MqttException {
return client.disconnect();
}
public void disconnectForcibly() throws MqttException {
client.disconnectForcibly(TIMEOUT_MS);
}
public IMqttToken publishAndWait(String topic, byte[] payload) throws MqttException {
IMqttToken iMqttToken = publish(topic, payload);
iMqttToken.waitForCompletion(TIMEOUT_MS);
return iMqttToken;
}
public IMqttToken publish(String topic, byte[] payload) throws MqttException {
MqttMessage message = new MqttMessage();
message.setPayload(payload);
return publish(topic, message);
}
public IMqttToken publish(String topic, MqttMessage message) throws MqttException {
return publish(topic, message.getPayload(), message.getQos(), message.isRetained());
}
public IMqttToken publish(String topic, byte[] payload, int qos, boolean retain) throws MqttException {
return client.publish(topic, payload, qos, retain);
}
public IMqttToken subscribeAndWait(String topic, MqttQoS qoS) throws MqttException {
IMqttToken iMqttToken = subscribe(topic, qoS);
iMqttToken.waitForCompletion(TIMEOUT_MS);
return iMqttToken;
}
public IMqttToken subscribe(String topic, MqttQoS qoS) throws MqttException {
return client.subscribe(topic, qoS.value());
}
public IMqttToken unsubscribeAndWait(String topic) throws MqttException {
IMqttToken iMqttToken = unsubscribe(topic);
iMqttToken.waitForCompletion(TIMEOUT_MS);
return iMqttToken;
}
public IMqttToken unsubscribe(String topic) throws MqttException {
return client.unsubscribe(topic);
}
public boolean isConnected() {
return client.isConnected();
}
public void enableManualAcks() {
client.setManualAcks(true);
}
public void messageArrivedComplete(MqttMessage mqttMessage) throws MqttException {
client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
}
private MqttAsyncClient createClient() throws MqttException {
return createClient(true);
}
private MqttAsyncClient createClient(boolean generateClientId) throws MqttException {
String clientId = null;
if (generateClientId) {
clientId = "test" + System.nanoTime();
}
return createClient(clientId);
}
private MqttAsyncClient createClient(String clientId) throws MqttException {
return new MqttAsyncClient(MQTT_URL, clientId, new MemoryPersistence());
}
}

View File

@ -0,0 +1,161 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.attributes;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.junit.Before;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class AbstractAttributesMqttV5Test extends AbstractMqttV5Test {
private static final String SHARED_ATTRIBUTES_PAYLOAD = "{\"sharedStr\":\"value1\",\"sharedBool\":true,\"sharedDbl\":42.0,\"sharedLong\":73," +
"\"sharedJson\":{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}}";
private static final String SHARED_ATTRIBUTES_DELETED_RESPONSE = "{\"deleted\":[\"sharedJson\"]}";
protected static final String PAYLOAD_VALUES_STR = "{\"key1\":\"value1\", \"key2\":true, \"key3\": 3.0, \"key4\": 4," +
" \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}";
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Attributes device")
.build();
processBeforeTest(configProperties);
}
protected void processAttributesPublishTest() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
client.publishAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, PAYLOAD_VALUES_STR.getBytes());
client.disconnectAndWait();
DeviceId deviceId = savedDevice.getId();
long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 5000;
List<String> actualKeys = null;
while (start <= end) {
actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/attributes/CLIENT_SCOPE", new TypeReference<>() {
});
if (actualKeys.size() == expectedKeys.size()) {
break;
}
Thread.sleep(100);
start += 100;
}
assertNotNull(actualKeys);
Set<String> actualKeySet = new HashSet<>(actualKeys);
Set<String> expectedKeySet = new HashSet<>(expectedKeys);
assertEquals(expectedKeySet, actualKeySet);
String getAttributesValuesUrl = getAttributesValuesUrl(deviceId, actualKeySet);
List<Map<String, Object>> values = doGetAsyncTyped(getAttributesValuesUrl, new TypeReference<>() {
});
assertAttributesValues(values, actualKeySet);
String deleteAttributesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet);
doDelete(deleteAttributesUrl);
}
protected void processAttributesUpdatesTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
MqttV5TestCallback onUpdateCallback = new MqttV5TestCallback();
client.setCallback(onUpdateCallback);
client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", SHARED_ATTRIBUTES_PAYLOAD, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
validateUpdateAttributesResponse(onUpdateCallback, SHARED_ATTRIBUTES_PAYLOAD);
MqttV5TestCallback onDeleteCallback = new MqttV5TestCallback();
client.setCallback(onDeleteCallback);
doDelete("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/SHARED_SCOPE?keys=sharedJson", String.class);
onDeleteCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
validateUpdateAttributesResponse(onDeleteCallback, SHARED_ATTRIBUTES_DELETED_RESPONSE);
client.disconnect();
}
private String getAttributesValuesUrl(DeviceId deviceId, Set<String> actualKeySet) {
return "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/attributes/CLIENT_SCOPE?keys=" + String.join(",", actualKeySet);
}
protected void assertAttributesValues(List<Map<String, Object>> deviceValues, Set<String> keySet) {
for (Map<String, Object> map : deviceValues) {
String key = (String) map.get("key");
Object value = map.get("value");
assertTrue(keySet.contains(key));
switch (key) {
case "key1":
assertEquals("value1", value);
break;
case "key2":
assertEquals(true, value);
break;
case "key3":
assertEquals(3.0, value);
break;
case "key4":
assertEquals(4, value);
break;
case "key5":
assertNotNull(value);
assertEquals(3, ((LinkedHashMap) value).size());
assertEquals(42, ((LinkedHashMap) value).get("someNumber"));
assertEquals(Arrays.asList(1, 2, 3), ((LinkedHashMap) value).get("someArray"));
LinkedHashMap<String, String> someNestedObject = (LinkedHashMap) ((LinkedHashMap) value).get("someNestedObject");
assertEquals("value", someNestedObject.get("key"));
break;
}
}
}
protected void validateUpdateAttributesResponse(MqttV5TestCallback callback, String expectedResponse) {
assertNotNull(callback.getPayloadBytes());
assertEquals(JacksonUtil.toJsonNode(expectedResponse), JacksonUtil.fromBytes(callback.getPayloadBytes()));
}
}

View File

@ -0,0 +1,43 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.attributes.updates;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv5.attributes.AbstractAttributesMqttV5Test;
@Slf4j
@DaoSqlTest
public class AttributesUpdatesTest extends AbstractAttributesMqttV5Test {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Subscribe to attribute updates")
.transportPayloadType(TransportPayloadType.JSON)
.build();
processBeforeTest(configProperties);
}
@Test
public void testAttributeMqttV5SimpleClientUpdates() throws Exception {
processAttributesUpdatesTest();
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.attributes.upload;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv5.attributes.AbstractAttributesMqttV5Test;
@DaoSqlTest
public class AttributesPublishTest extends AbstractAttributesMqttV5Test {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Attributes device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testMqttV5AttributePublishTest() throws Exception {
processAttributesPublishTest();
}
}

View File

@ -0,0 +1,101 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.claim;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.ClaimRequest;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.dao.device.claim.ClaimResponse;
import org.thingsboard.server.dao.device.claim.ClaimResult;
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_CLAIM_TOPIC;
@Slf4j
public abstract class AbstractMqttV5ClaimTest extends AbstractMqttV5Test {
protected static final String CUSTOMER_USER_PASSWORD = "customerUser123!";
protected User customerAdmin;
protected Customer savedCustomer;
protected void processTestClaimingDevice() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
byte[] payloadBytes;
byte[] failurePayloadBytes;
payloadBytes = "{\"secretKey\":\"value\", \"durationMs\":60000}".getBytes();
failurePayloadBytes = "{\"secretKey\":\"value\", \"durationMs\":1}".getBytes();
validateClaimResponse(client, payloadBytes, failurePayloadBytes);
}
protected void validateClaimResponse(MqttV5TestClient client, byte[] payloadBytes, byte[] failurePayloadBytes) throws Exception {
client.publishAndWait(DEVICE_CLAIM_TOPIC, failurePayloadBytes);
loginUser(customerAdmin.getName(), CUSTOMER_USER_PASSWORD);
ClaimRequest claimRequest = new ClaimRequest("value");
ClaimResponse claimResponse = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResponse.class, status().isBadRequest()),
20,
100
);
assertEquals(claimResponse, ClaimResponse.FAILURE);
client.publishAndWait(DEVICE_CLAIM_TOPIC, payloadBytes);
client.disconnect();
ClaimResult claimResult = doExecuteWithRetriesAndInterval(
() -> doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResult.class, status().isOk()),
20,
100
);
assertEquals(claimResult.getResponse(), ClaimResponse.SUCCESS);
Device claimedDevice = claimResult.getDevice();
assertNotNull(claimedDevice);
assertNotNull(claimedDevice.getCustomerId());
assertEquals(customerAdmin.getCustomerId(), claimedDevice.getCustomerId());
claimResponse = doPostClaimAsync("/api/customer/device/" + savedDevice.getName() + "/claim", claimRequest, ClaimResponse.class, status().isBadRequest());
assertEquals(claimResponse, ClaimResponse.CLAIMED);
}
protected void createCustomerAndUser() throws Exception {
Customer customer = new Customer();
customer.setTenantId(tenantId);
customer.setTitle("Test Claiming Customer");
savedCustomer = doPost("/api/customer", customer, Customer.class);
assertNotNull(savedCustomer);
assertEquals(tenantId, savedCustomer.getTenantId());
User user = new User();
user.setAuthority(Authority.CUSTOMER_USER);
user.setTenantId(tenantId);
user.setCustomerId(savedCustomer.getId());
user.setEmail("customer@thingsboard.org");
customerAdmin = createUser(user, CUSTOMER_USER_PASSWORD);
assertNotNull(customerAdmin);
assertEquals(customerAdmin.getCustomerId(), savedCustomer.getId());
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.claim;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5ClaimTest extends AbstractMqttV5ClaimTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Claim device")
.build();
processBeforeTest(configProperties);
createCustomerAndUser();
}
@Test
public void testClaimingDevice() throws Exception {
processTestClaimingDevice();
}
}

View File

@ -0,0 +1,111 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.connection;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.packet.MqttConnAck;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import java.util.concurrent.TimeUnit;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_CONNACK;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class AbstractMqttV5ClientConnectionTest extends AbstractMqttIntegrationTest {
protected void processClientWithCorrectAccessTokenTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
IMqttToken connectionResult = client.connectAndWait(accessToken);
MqttWireMessage response = connectionResult.getResponse();
Assert.assertEquals(MESSAGE_TYPE_CONNACK, response.getType());
MqttConnAck connAckMsg = (MqttConnAck) response;
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, connAckMsg.getReturnCode());
client.disconnect();
}
protected void processClientWithWrongAccessTokenTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
try {
client.connectAndWait("wrongAccessToken");
} catch (MqttException e) {
Assert.assertEquals(MqttReturnCode.RETURN_CODE_BAD_USERNAME_OR_PASSWORD, e.getReasonCode());
}
}
protected void processClientWithWrongClientIdAndEmptyUsernamePasswordTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient("unknownClientId");
try {
client.connectAndWait();
} catch (MqttException e) {
Assert.assertEquals(MqttReturnCode.RETURN_CODE_IDENTIFIER_NOT_VALID, e.getReasonCode());
}
}
protected void processClientWithNoCredentialsTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient(false);
try {
client.connectAndWait();
} catch (MqttException e) {
Assert.assertEquals(MqttReturnCode.RETURN_CODE_NOT_AUTHORIZED, e.getReasonCode());
}
}
protected void processClientWithPacketSizeLimitationTest() throws Exception {
int packetSizeLimit = 99;
MqttConnectionOptions options = new MqttConnectionOptions();
options.setMaximumPacketSize((long) packetSizeLimit);
options.setUserName(accessToken);
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(options);
MqttV5TestCallback possibleSizeCallback = updateAttributeWithStringValue(client, packetSizeLimit / 2);
Assert.assertTrue("Server should send messages if size less then limitation.", possibleSizeCallback.getPayloadBytes().length < packetSizeLimit);
MqttV5TestCallback bigMessageCallback = updateAttributeWithStringValue(client, packetSizeLimit * 2);
Assert.assertNull("Server should not send a message if the message size bigger then set limit.", bigMessageCallback.getLastReceivedMessage());
client.disconnect();
}
private MqttV5TestCallback updateAttributeWithStringValue(MqttV5TestClient client, int valueLen) throws Exception {
MqttV5TestCallback onUpdateCallback = new MqttV5TestCallback();
client.setCallback(onUpdateCallback);
client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
String payload = "{\"sharedStr\":\"" + StringUtils.repeat("*", valueLen) + "\"}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + savedDevice.getId().getId() + "/attributes/SHARED_SCOPE", payload, String.class, status().isOk());
onUpdateCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
return onUpdateCallback;
}
}

View File

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.connection;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5ClientConnectionTest extends AbstractMqttV5ClientConnectionTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test MqttV5 client device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testClientWithCorrectAccessToken() throws Exception {
processClientWithCorrectAccessTokenTest();
}
@Test
public void testClientWithWrongAccessToken() throws Exception {
processClientWithWrongAccessTokenTest();
}
@Test
public void testClientWithWrongClientIdAndEmptyUsernamePassword() throws Exception {
processClientWithWrongClientIdAndEmptyUsernamePasswordTest();
}
@Test
public void testClientWithNoCredentialsTest() throws Exception {
processClientWithNoCredentialsTest();
}
@Test
@Ignore("Not implemented on the server.")
public void testClientWithPacketSizeLimitation() throws Exception {
processClientWithPacketSizeLimitationTest();
}
}

View File

@ -0,0 +1,77 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.publish;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.common.packet.MqttPubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_PUBACK;
public abstract class AbstractMqttV5ClientPublishTest extends AbstractMqttIntegrationTest {
protected static final String PAYLOAD_VALUES_STR = "{\"key1\":\"value1\", \"key2\":true, \"key3\": 3.0, \"key4\": 4," +
" \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}";
protected static final String INVALID_PAYLOAD_VALUES_STR = "\"key1\":\"value1\", \"key2\":true, \"key3\": 3.0, \"key4\": 4," +
" \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}";
protected void processClientPublishToCorrectTopicTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
IMqttToken publishResult = client.publishAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, PAYLOAD_VALUES_STR.getBytes());
MqttWireMessage response = publishResult.getResponse();
Assert.assertEquals(MESSAGE_TYPE_PUBACK, response.getType());
MqttPubAck pubAckMsg = (MqttPubAck) response;
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, pubAckMsg.getReturnCode());
client.disconnect();
}
protected void processClientPublishToWrongTopicTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
IMqttToken iMqttToken = client.publishAndWait("wrong/topic/", PAYLOAD_VALUES_STR.getBytes());
Assert.assertEquals(MESSAGE_TYPE_PUBACK,iMqttToken.getResponse().getType());
MqttPubAck pubAck = (MqttPubAck) iMqttToken.getResponse();
Assert.assertEquals(MqttReturnCode.RETURN_CODE_TOPIC_NAME_INVALID, pubAck.getReturnCode());
client.disconnect();
}
protected void processClientPublishWithInvalidPayloadTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
IMqttToken iMqttToken = client.publishAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, INVALID_PAYLOAD_VALUES_STR.getBytes());
Assert.assertEquals(MESSAGE_TYPE_PUBACK,iMqttToken.getResponse().getType());
MqttPubAck pubAck = (MqttPubAck) iMqttToken.getResponse();
Assert.assertEquals(MqttReturnCode.RETURN_CODE_PAYLOAD_FORMAT_INVALID, pubAck.getReturnCode());
client.disconnect();
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.publish;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5ClientPublishTest extends AbstractMqttV5ClientPublishTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test MqttV5 client device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testClientPublishToCorrectTopic() throws Exception {
processClientPublishToCorrectTopicTest();
}
@Test
public void testClientPublishToWrongTopic() throws Exception {
processClientPublishToWrongTopicTest();
}
@Test
public void testClientPublishWithInvalidPayload() throws Exception {
processClientPublishWithInvalidPayloadTest();
}
}

View File

@ -0,0 +1,63 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.subscribe;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttSubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_SUBACK;
public abstract class AbstractMqttV5ClientSubscriptionTest extends AbstractMqttIntegrationTest {
protected void processClientSubscriptionToCorrectTopicTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
IMqttToken subscriptionResult = client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
MqttWireMessage response = subscriptionResult.getResponse();
Assert.assertEquals(MESSAGE_TYPE_SUBACK, response.getType());
MqttSubAck subAckMsg = (MqttSubAck) response;
Assert.assertEquals(1, subAckMsg.getReturnCodes().length);
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, subAckMsg.getReturnCodes()[0]);
client.disconnect();
}
protected void processClientSubscriptionToWrongTopicTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
IMqttToken iMqttToken = client.subscribeAndWait("wrong/topic/+", MqttQoS.AT_MOST_ONCE);
Assert.assertEquals(MESSAGE_TYPE_SUBACK,iMqttToken.getResponse().getType());
MqttSubAck subAck = (MqttSubAck) iMqttToken.getResponse();
Assert.assertEquals(1, subAck.getReturnCodes().length);
Assert.assertEquals(MqttReturnCode.RETURN_CODE_TOPIC_FILTER_NOT_VALID, subAck.getReturnCodes()[0]);
client.disconnect();
}
}

View File

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.subscribe;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5ClientSubscriptionTest extends AbstractMqttV5ClientSubscriptionTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test MqttV5 client device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testClientSubscriptionToCorrectTopic() throws Exception {
processClientSubscriptionToCorrectTopicTest();
}
@Test
public void testClientSubscriptionToWrongTopic() throws Exception {
processClientSubscriptionToWrongTopicTest();
}
}

View File

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.unsubscribe;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.common.packet.MqttReturnCode;
import org.eclipse.paho.mqttv5.common.packet.MqttUnsubAck;
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
import org.junit.Assert;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import static org.eclipse.paho.mqttv5.common.packet.MqttWireMessage.MESSAGE_TYPE_UNSUBACK;
public abstract class AbstractMqttV5ClientUnsubscribeTest extends AbstractMqttIntegrationTest {
protected void processClientUnsubscribeFromCorrectTopicTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
client.subscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC, MqttQoS.AT_MOST_ONCE);
IMqttToken unsubscribeResult = client.unsubscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
MqttWireMessage response = unsubscribeResult.getResponse();
Assert.assertEquals(MESSAGE_TYPE_UNSUBACK, response.getType());
MqttUnsubAck unsubAckMsg = (MqttUnsubAck) response;
Assert.assertEquals(1, unsubAckMsg.getReturnCodes().length);
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, unsubAckMsg.getReturnCodes()[0]);
client.disconnect();
}
protected void processClientUnsubscribeWithoutSubscribeTopicTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
IMqttToken iMqttToken = client.unsubscribeAndWait(MqttTopics.DEVICE_ATTRIBUTES_TOPIC);
Assert.assertEquals(MESSAGE_TYPE_UNSUBACK, iMqttToken.getResponse().getType());
MqttUnsubAck unsubAck = (MqttUnsubAck) iMqttToken.getResponse();
Assert.assertEquals(1, unsubAck.getReturnCodes().length);
Assert.assertEquals(MqttReturnCode.RETURN_CODE_NO_SUBSCRIPTION_EXISTED, unsubAck.getReturnCodes()[0]);
client.disconnect();
}
}

View File

@ -0,0 +1,44 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.client.unsubscribe;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5ClientUnsubscribeTest extends AbstractMqttV5ClientUnsubscribeTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test MqttV5 client device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testClientUnsubscribeFromCorrectTopic() throws Exception {
processClientUnsubscribeFromCorrectTopicTest();
}
@Test
public void testClientUnsubscribeWithoutSubscribeTopic() throws Exception {
processClientUnsubscribeWithoutSubscribeTopicTest();
}
}

View File

@ -0,0 +1,94 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.provision;
import com.fasterxml.jackson.databind.JsonNode;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.device.provision.ProvisionResponseStatus;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_PROVISION_REQUEST_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC;
@Slf4j
@DaoSqlTest
public class MqttV5ProvisionDeviceTest extends AbstractMqttV5Test {
@Autowired
DeviceCredentialsService deviceCredentialsService;
@Autowired
DeviceService deviceService;
@Test
public void testProvisioningCreateNewDeviceWithoutCredentials() throws Exception {
processTestProvisioningCreateNewDeviceWithoutCredentials();
}
protected void processTestProvisioningCreateNewDeviceWithoutCredentials() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Provision device3")
.transportPayloadType(TransportPayloadType.JSON)
.provisionType(DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES)
.provisionKey("testProvisionKey")
.provisionSecret("testProvisionSecret")
.build();
super.processBeforeTest(configProperties);
byte[] result = createMqttClientAndPublish();
JsonNode response = JacksonUtil.fromBytes(result);
Assert.assertTrue(response.hasNonNull("credentialsType"));
Assert.assertTrue(response.hasNonNull("status"));
Device createdDevice = deviceService.findDeviceByTenantIdAndName(tenantId, "Test Provision device");
Assert.assertNotNull(createdDevice);
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, createdDevice.getId());
Assert.assertEquals(deviceCredentials.getCredentialsType().name(), response.get("credentialsType").asText());
Assert.assertEquals(ProvisionResponseStatus.SUCCESS.name(), response.get("status").asText());
}
protected byte[] createMqttClientAndPublish() throws Exception {
String provisionRequestMsg = "{\"deviceName\":\"Test Provision device\",\"provisionDeviceKey\":\"testProvisionKey\", \"provisionDeviceSecret\":\"testProvisionSecret\"}";
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait("provision");
MqttV5TestCallback onProvisionCallback = new MqttV5TestCallback(DEVICE_PROVISION_RESPONSE_TOPIC);
client.setCallback(onProvisionCallback);
client.subscribe(DEVICE_PROVISION_RESPONSE_TOPIC, MqttQoS.AT_MOST_ONCE);
client.publishAndWait(DEVICE_PROVISION_REQUEST_TOPIC, provisionRequestMsg.getBytes());
onProvisionCallback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
client.disconnect();
return onProvisionCallback.getPayloadBytes();
}
}

View File

@ -0,0 +1,98 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.rpc;
import com.nimbusds.jose.util.StandardCharset;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestCallback;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
@Slf4j
public abstract class AbstractMqttV5RpcTest extends AbstractMqttV5Test {
private static final String DEVICE_RESPONSE = "{\"value1\":\"A\",\"value2\":\"B\"}";
protected void processOneWayRpcTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
MqttV5TestCallback callback = new MqttV5TestCallback(DEVICE_RPC_REQUESTS_SUB_TOPIC.replace("+", "0"));
client.setCallback(callback);
client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_MOST_ONCE);
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}";
String result = doPostAsync("/api/rpc/oneway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk());
assertTrue(StringUtils.isEmpty(result));
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes()));
assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS());
client.disconnect();
}
protected void processJsonTwoWayRpcTest() throws Exception {
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
client.subscribeAndWait(DEVICE_RPC_REQUESTS_SUB_TOPIC, MqttQoS.AT_LEAST_ONCE);
MqttV5TestRpcCallback callback = new MqttV5TestRpcCallback(client, DEVICE_RPC_REQUESTS_SUB_TOPIC.replace("+", "0"));
client.setCallback(callback);
String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"26\",\"value\": 1}}";
String actualRpcResponse = doPostAsync("/api/rpc/twoway/" + savedDevice.getId(), setGpioRequest, String.class, status().isOk());
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(JacksonUtil.toJsonNode(setGpioRequest), JacksonUtil.fromBytes(callback.getPayloadBytes()));
assertEquals("{\"value1\":\"A\",\"value2\":\"B\"}", actualRpcResponse);
client.disconnect();
}
protected class MqttV5TestRpcCallback extends MqttV5TestCallback {
private final MqttV5TestClient client;
public MqttV5TestRpcCallback(MqttV5TestClient client, String awaitSubTopic) {
super(awaitSubTopic);
this.client = client;
}
@Override
protected void messageArrivedOnAwaitSubTopic(String requestTopic, MqttMessage mqttMessage) {
log.warn("messageArrived on topic: {}, awaitSubTopic: {}", requestTopic, awaitSubTopic);
if (awaitSubTopic.equals(requestTopic)) {
qoS = mqttMessage.getQos();
payloadBytes = mqttMessage.getPayload();
String responseTopic = requestTopic.replace("request", "response");
try {
client.publish(responseTopic, DEVICE_RESPONSE.getBytes(StandardCharset.UTF_8));
} catch (MqttException e) {
log.warn("Failed to publish response on topic: {} due to: ", responseTopic, e);
}
subscribeLatch.countDown();
}
}
}
}

View File

@ -0,0 +1,43 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.rpc;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5RpcTest extends AbstractMqttV5RpcTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("RPC test device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testServerMqttV5SimpleClientOneWayRpc() throws Exception {
processOneWayRpcTest();
}
@Test
public void testServerMqttV5SimpleClientTwoWayRpc() throws Exception {
processJsonTwoWayRpcTest();
}
}

View File

@ -0,0 +1,132 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.timeseries;
import com.fasterxml.jackson.core.type.TypeReference;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.transport.mqtt.mqttv5.AbstractMqttV5Test;
import org.thingsboard.server.transport.mqtt.mqttv5.MqttV5TestClient;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public abstract class AbstractMqttV5TimeseriesTest extends AbstractMqttV5Test {
protected static final String PAYLOAD_VALUES_STR = "{\"key1\":\"value1\", \"key2\":true, \"key3\": 3.0, \"key4\": 4," +
" \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}";
protected void processTimeseriesMqttV5UploadTest() throws Exception {
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
MqttV5TestClient client = new MqttV5TestClient();
client.connectAndWait(accessToken);
client.publishAndWait(MqttTopics.DEVICE_TELEMETRY_TOPIC, PAYLOAD_VALUES_STR.getBytes());
client.disconnect();
DeviceId deviceId = savedDevice.getId();
List<String> actualKeys = getActualKeysList(deviceId, expectedKeys);
assertNotNull(actualKeys);
Set<String> actualKeySet = new HashSet<>(actualKeys);
Set<String> expectedKeySet = new HashSet<>(expectedKeys);
assertEquals(expectedKeySet, actualKeySet);
String getTelemetryValuesUrl;
getTelemetryValuesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?keys=" + String.join(",", actualKeySet);
long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 5000;
Map<String, List<Map<String, Object>>> values = null;
while (start <= end) {
values = doGetAsyncTyped(getTelemetryValuesUrl, new TypeReference<>() {
});
boolean valid = values.size() == expectedKeys.size();
if (valid) {
for (String key : expectedKeys) {
List<Map<String, Object>> tsValues = values.get(key);
if (tsValues != null && tsValues.size() > 0) {
Object ts = tsValues.get(0).get("ts");
if (ts == null) {
valid = false;
break;
}
} else {
valid = false;
break;
}
}
}
if (valid) {
break;
}
Thread.sleep(100);
start += 100;
}
assertNotNull(values);
assertValues(values);
}
private List<String> getActualKeysList(DeviceId deviceId, List<String> expectedKeys) throws Exception {
long start = System.currentTimeMillis();
long end = System.currentTimeMillis() + 3000;
List<String> actualKeys = null;
while (start <= end) {
actualKeys = doGetAsyncTyped("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", new TypeReference<>() {
});
if (actualKeys.size() == expectedKeys.size()) {
break;
}
Thread.sleep(100);
start += 100;
}
return actualKeys;
}
private void assertValues(Map<String, List<Map<String, Object>>> deviceValues) {
for (Map.Entry<String, List<Map<String, Object>>> entry : deviceValues.entrySet()) {
String key = entry.getKey();
List<Map<String, Object>> tsKv = entry.getValue();
String value = (String) tsKv.get(0).get("value");
switch (key) {
case "key1":
assertEquals("value1", value);
break;
case "key2":
assertEquals("true", value);
break;
case "key3":
assertEquals("3.0", value);
break;
case "key4":
assertEquals("4", value);
break;
case "key5":
assertEquals("{\"someNumber\":42,\"someArray\":[1,2,3],\"someNestedObject\":{\"key\":\"value\"}}", value);
break;
}
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.mqttv5.timeseries;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
@DaoSqlTest
public class MqttV5TimeseriesTest extends AbstractMqttV5TimeseriesTest {
@Before
public void beforeTest() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.deviceName("Test Post Telemetry device")
.build();
processBeforeTest(configProperties);
}
@Test
public void testTimeseriesMqttV5SimpleClientUpload() throws Exception {
processTimeseriesMqttV5UploadTest();
}
}

View File

@ -26,6 +26,7 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
@ -35,17 +36,18 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.device.profile.MqttTopics;
import org.thingsboard.server.common.data.id.DeviceId;
@ -72,6 +74,8 @@ import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver;
import javax.net.ssl.SSLPeerUnverifiedException;
import java.io.IOException;
@ -79,6 +83,7 @@ import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -90,17 +95,13 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static com.amazonaws.util.StringUtils.UTF8;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.CONNECT;
import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED;
import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN;
@ -355,10 +356,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
break;
default:
ack(ctx, msgId);
ack(ctx, msgId, ReturnCode.TOPIC_NAME_INVALID);
}
} catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
ctx.close();
} catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@ -447,7 +449,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
attrReqTopicType = TopicType.V2;
} else {
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
ack(ctx, msgId);
ack(ctx, msgId, ReturnCode.TOPIC_NAME_INVALID);
}
} catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@ -456,9 +458,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) {
if (deviceSessionCtx.isSendAckOnValidationException() && msgId > 0) {
if ((deviceSessionCtx.isSendAckOnValidationException() || MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) && msgId > 0) {
log.debug("[{}] Send pub ack on invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.PAYLOAD_FORMAT_INVALID));
} else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close();
@ -500,9 +502,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
}
private void ack(ChannelHandlerContext ctx, int msgId) {
private void ack(ChannelHandlerContext ctx, int msgId, ReturnCode returnCode) {
if (msgId > 0) {
ctx.writeAndFlush(createMqttPubAckMsg(msgId));
ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode));
}
}
@ -511,7 +513,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess(Void dummy) {
log.trace("[{}] Published msg: {}", sessionId, msg);
ack(ctx, msgId);
ack(ctx, msgId, ReturnCode.SUCCESS);
}
@Override
@ -536,7 +538,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onSuccess(TransportProtos.ProvisionDeviceResponseMsg provisionResponseMsg) {
log.trace("[{}] Published msg: {}", sessionId, msg);
ack(ctx, msgId);
ack(ctx, msgId, ReturnCode.SUCCESS);
try {
if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) {
deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush);
@ -545,13 +547,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
scheduler.schedule((Callable<ChannelFuture>) ctx::close, 60, TimeUnit.SECONDS);
} catch (Exception e) {
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
log.trace("[{}] Failed to convert device provision response to MQTT msg", sessionId, e);
}
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
ctx.close();
}
}
@ -593,7 +596,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void sendOtaPackage(ChannelHandlerContext ctx, int msgId, String firmwareId, String requestId, int chunkSize, int chunk, OtaPackageType type) {
log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId);
ack(ctx, msgId);
ack(ctx, msgId, ReturnCode.SUCCESS);
try {
byte[] firmwareChunk = context.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk);
deviceSessionCtx.getPayloadAdaptor()
@ -614,6 +617,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
int returnCode = ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.NOT_AUTHORIZED_5);
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(returnCode)));
return;
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
@ -684,12 +689,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
break;
default:
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(FAILURE.value());
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.TOPIC_FILTER_INVALID));
break;
}
} catch (Exception e) {
log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e);
grantedQoSList.add(FAILURE.value());
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.IMPLEMENTATION_SPECIFIC));
}
}
if (!activityReported) {
@ -717,76 +722,93 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(ReturnCode.NOT_AUTHORIZED_5.shortValue())));
return;
}
boolean activityReported = false;
List<Short> unSubResults = new ArrayList<>();
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
for (String topicName : mqttMsg.payload().topics()) {
mqttQoSMap.remove(new MqttTopicMatcher(topicName));
try {
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: {
activityReported = true;
break;
MqttTopicMatcher matcher = new MqttTopicMatcher(topicName);
if (mqttQoSMap.containsKey(matcher)) {
mqttQoSMap.remove(matcher);
try {
short resultValue = ReturnCode.SUCCESS.shortValue();
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_SHORT_PROTO_TOPIC: {
transportService.process(deviceSessionCtx.getSessionInfo(),
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_SHORT_PROTO_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_JSON_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_SHORT_PROTO_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_ERROR_TOPIC: {
activityReported = true;
break;
}
default:
log.trace("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
resultValue = ReturnCode.TOPIC_FILTER_INVALID.shortValue();
}
unSubResults.add(resultValue);
} catch (Exception e) {
log.debug("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
unSubResults.add(ReturnCode.IMPLEMENTATION_SPECIFIC.shortValue());
}
} catch (Exception e) {
log.debug("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
} else {
log.debug("[{}] Failed to process unsubscription [{}] to [{}] - Subscription not found", sessionId, mqttMsg.variableHeader().messageId(), topicName);
unSubResults.add(ReturnCode.NO_SUBSCRIPTION_EXISTED.shortValue());
}
}
if (!activityReported) {
transportService.reportActivity(deviceSessionCtx.getSessionInfo());
}
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), unSubResults));
}
private MqttMessage createUnSubAckMessage(int msgId) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(UNSUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId);
return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader);
private MqttMessage createUnSubAckMessage(int msgId, List<Short> resultCodes) {
MqttMessageBuilders.UnsubAckBuilder unsubAckBuilder = MqttMessageBuilders.unsubAck();
unsubAckBuilder.packetId(msgId);
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
unsubAckBuilder.addReasonCodes(resultCodes.toArray(Short[]::new));
}
return unsubAckBuilder.build();
}
void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());
String userName = msg.payload().userName();
String clientId = msg.payload().clientIdentifier();
deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));
if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
deviceSessionCtx.setProvisionOnly(true);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, msg));
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));
} else {
X509Certificate cert;
if (sslHandler != null && (cert = getX509Certificate()) != null) {
@ -820,7 +842,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close();
}
});
@ -843,13 +865,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close();
}
});
} catch (Exception e) {
context.onAuthFailure(address);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.NOT_AUTHORIZED_5, connectMessage));
log.trace("[{}] X509 auth failure: {}", sessionId, address, e);
ctx.close();
}
@ -868,12 +890,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return null;
}
private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(CONNACK, false, AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader mqttConnAckVariableHeader =
new MqttConnAckVariableHeader(returnCode, !msg.variableHeader().isCleanSession());
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
private MqttConnAckMessage createMqttConnAckMsg(ReturnCode returnCode, MqttConnectMessage msg) {
MqttMessageBuilders.ConnAckBuilder connAckBuilder = MqttMessageBuilders.connAck();
connAckBuilder.sessionPresent(!msg.variableHeader().isCleanSession());
MqttConnectReturnCode finalReturnCode = ReturnCodeResolver.getConnectionReturnCode(deviceSessionCtx.getMqttVersion(), returnCode);
connAckBuilder.returnCode(finalReturnCode);
return connAckBuilder.build();
}
@Override
@ -918,12 +940,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
return Math.min(reqQoS.value(), MAX_SUPPORTED_QOS_LVL.value());
}
public static MqttPubAckMessage createMqttPubAckMsg(int requestId) {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMsgIdVariableHeader =
MqttMessageIdVariableHeader.from(requestId);
return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader);
private static MqttVersion getMqttVersion(int versionCode) {
switch (versionCode) {
case 3:
return MqttVersion.MQTT_3_1;
case 5:
return MqttVersion.MQTT_5;
default:
return MqttVersion.MQTT_3_1_1;
}
}
public static MqttMessage createMqttPubAckMsg(DeviceSessionCtx deviceSessionCtx, int requestId, ReturnCode returnCode) {
MqttMessageBuilders.PubAckBuilder pubAckMsgBuilder = MqttMessageBuilders.pubAck().packetId(requestId);
if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) {
pubAckMsgBuilder.reasonCode(returnCode.byteValue());
}
return pubAckMsgBuilder.build();
}
private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) {
@ -976,7 +1009,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
if (!msg.hasDeviceInfo()) {
context.onAuthFailure(address);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED, connectMessage));
ReturnCode returnCode = ReturnCode.NOT_AUTHORIZED_5;
if (sslHandler == null || getX509Certificate() == null) {
String username = connectMessage.payload().userName();
byte[] passwordBytes = connectMessage.payload().passwordInBytes();
String clientId = connectMessage.payload().clientIdentifier();
if ((username != null && passwordBytes != null && clientId != null)
|| (username == null ^ passwordBytes == null)) {
returnCode = ReturnCode.BAD_USERNAME_OR_PASSWORD;
} else if (!StringUtils.isBlank(clientId)) {
returnCode = ReturnCode.CLIENT_IDENTIFIER_NOT_VALID;
}
}
ctx.writeAndFlush(createMqttConnAckMsg(returnCode, connectMessage));
ctx.close();
} else {
context.onAuthSuccess(address);
@ -988,7 +1033,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onSuccess(Void msg) {
SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this);
checkGatewaySession(sessionMetaData);
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, connectMessage));
deviceSessionCtx.setConnected(true);
log.debug("[{}] Client connected!", sessionId);
transportService.getCallbackExecutor().execute(() -> processMsgQueue(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread.
@ -1001,7 +1046,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else {
log.warn("[{}] Failed to submit session event", sessionId, e);
}
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, connectMessage));
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close();
}
});

View File

@ -19,6 +19,7 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.util.ReferenceCountUtil;
import lombok.Getter;
import lombok.Setter;
@ -74,6 +75,10 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
@Setter
private boolean provisionOnly = false;
@Getter
@Setter
private MqttVersion mqttVersion;
private volatile MqttTopicFilter telemetryTopicFilter = MqttTopicFilterFactory.getDefaultTelemetryFilter();
private volatile MqttTopicFilter attributesTopicFilter = MqttTopicFilterFactory.getDefaultAttributesFilter();
private volatile TransportPayloadType payloadType = TransportPayloadType.JSON;

View File

@ -53,6 +53,7 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.util.ReturnCode;
import javax.annotation.Nullable;
import java.util.Collections;
@ -220,7 +221,7 @@ public class GatewaySessionHandler {
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() {
@Override
public void onSuccess(@Nullable GatewayDeviceSessionCtx result) {
ack(msg);
ack(msg, ReturnCode.SUCCESS);
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
}
@ -336,7 +337,7 @@ public class GatewaySessionHandler {
private void processOnDisconnect(MqttPublishMessage msg, String deviceName) {
deregisterSession(deviceName);
ack(msg);
ack(msg, ReturnCode.SUCCESS);
}
private void onDeviceTelemetryJson(int msgId, ByteBuf payload) throws AdaptorException {
@ -663,7 +664,7 @@ public class GatewaySessionHandler {
@Override
public void onFailure(Throwable t) {
ack(mqttMsg);
ack(mqttMsg, ReturnCode.IMPLEMENTATION_SPECIFIC);
log.debug("[{}] Failed to process device attributes request command: {}", sessionId, deviceName, t);
}
}, context.getExecutor());
@ -716,10 +717,10 @@ public class GatewaySessionHandler {
return ProtoMqttAdaptor.toBytes(payload);
}
private void ack(MqttPublishMessage msg) {
private void ack(MqttPublishMessage msg, ReturnCode returnCode) {
int msgId = getMsgId(msg);
if (msgId > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId));
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode));
}
}
@ -735,7 +736,7 @@ public class GatewaySessionHandler {
public void onSuccess(Void dummy) {
log.trace("[{}][{}] Published msg: {}", sessionId, deviceName, msg);
if (msgId > 0) {
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msgId));
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.SUCCESS));
}
}

View File

@ -0,0 +1,104 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util;
public enum ReturnCode {
SUCCESS((byte) 0x00),
//MQTT 3 codes
UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01),
IDENTIFIER_REJECTED((byte) 0x02),
SERVER_UNAVAILABLE((byte) 0x03),
BAD_USER_NAME_OR_PASSWORD((byte) 0x04),
NOT_AUTHORIZED((byte) 0x05),
//MQTT 5 codes
NO_MATCHING_SUBSCRIBERS((byte) 0x10),
NO_SUBSCRIPTION_EXISTED((byte) 0x11),
CONTINUE_AUTHENTICATION((byte) 0x18),
REAUTHENTICATE((byte) 0x19),
UNSPECIFIED_ERROR((byte) 0x80),
MALFORMED_PACKET((byte) 0x81),
PROTOCOL_ERROR((byte) 0x82),
IMPLEMENTATION_SPECIFIC((byte) 0x83),
UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84),
CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85),
BAD_USERNAME_OR_PASSWORD((byte) 0x86),
NOT_AUTHORIZED_5((byte) 0x87),
SERVER_UNAVAILABLE_5((byte) 0x88),
SERVER_BUSY((byte) 0x89),
BANNED((byte) 0x8A),
SERVER_SHUTTING_DOWN((byte) 0x8B),
BAD_AUTHENTICATION_METHOD((byte) 0x8C),
KEEP_ALIVE_TIMEOUT((byte) 0x8D),
SESSION_TAKEN_OVER((byte) 0x8E),
TOPIC_FILTER_INVALID((byte) 0x8F),
TOPIC_NAME_INVALID((byte) 0x90),
PACKET_IDENTIFIER_IN_USE((byte) 0x91),
PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92),
RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93),
TOPIC_ALIAS_INVALID((byte) 0x94),
PACKET_TOO_LARGE((byte) 0x95),
MESSAGE_RATE_TOO_HIGH((byte) 0x96),
QUOTA_EXCEEDED((byte) 0x97),
ADMINISTRATIVE_ACTION((byte) 0x98),
PAYLOAD_FORMAT_INVALID((byte) 0x99),
RETAIN_NOT_SUPPORTED((byte) 0x9A),
QOS_NOT_SUPPORTED((byte) 0x9B),
USE_ANOTHER_SERVER((byte) 0x9C),
SERVER_MOVED((byte) 0x9D),
SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E),
CONNECTION_RATE_EXCEEDED((byte) 0x9F),
MAXIMUM_CONNECT_TIME((byte) 0xA0),
SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1),
WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2);
private static final ReturnCode[] VALUES;
static {
ReturnCode[] values = values();
VALUES = new ReturnCode[163];
for (ReturnCode code : values) {
final int unsignedByte = code.byteValue & 0xFF;
// Suppress a warning about out of bounds access since the enum contains only correct values
VALUES[unsignedByte] = code; // lgtm [java/index-out-of-bounds]
}
}
private final byte byteValue;
ReturnCode(byte byteValue) {
this.byteValue = byteValue;
}
public byte byteValue() {
return byteValue;
}
public short shortValue(){return byteValue;}
public static ReturnCode valueOf(byte b) {
final int unsignedByte = b & 0xFF;
ReturnCode mqttConnectReturnCode = null;
try {
mqttConnectReturnCode = VALUES[unsignedByte];
} catch (ArrayIndexOutOfBoundsException ignored) {
// no op
}
if (mqttConnectReturnCode == null) {
throw new IllegalArgumentException("unknown connect return code: " + unsignedByte);
}
return mqttConnectReturnCode;
}
}

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.util;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ReturnCodeResolver {
public static MqttConnectReturnCode getConnectionReturnCode(MqttVersion mqttVersion, ReturnCode returnCode) {
if (!MqttVersion.MQTT_5.equals(mqttVersion) && !ReturnCode.SUCCESS.equals(returnCode)) {
switch (returnCode) {
case BAD_USERNAME_OR_PASSWORD:
return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
case NOT_AUTHORIZED_5:
return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
case SERVER_UNAVAILABLE_5:
return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
case CLIENT_IDENTIFIER_NOT_VALID:
return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
default:
log.warn("Unknown return code for conversion: {}", returnCode.name());
}
}
return MqttConnectReturnCode.valueOf(returnCode.byteValue());
}
public static int getSubscriptionReturnCode(MqttVersion mqttVersion, ReturnCode returnCode) {
if (!MqttVersion.MQTT_5.equals(mqttVersion) && !ReturnCode.SUCCESS.equals(returnCode)) {
switch (returnCode) {
case UNSPECIFIED_ERROR:
case TOPIC_FILTER_INVALID:
case IMPLEMENTATION_SPECIFIC:
case NOT_AUTHORIZED_5:
case PACKET_IDENTIFIER_IN_USE:
case QUOTA_EXCEEDED:
case SHARED_SUBSCRIPTION_NOT_SUPPORTED:
case SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED:
case WILDCARD_SUBSCRIPTION_NOT_SUPPORTED:
return MqttQoS.FAILURE.value();
}
}
return returnCode.byteValue();
}
}

View File

@ -80,6 +80,7 @@
<tbel.version>1.0.3</tbel.version>
<lombok.version>1.18.18</lombok.version>
<paho.client.version>1.2.4</paho.client.version>
<paho.mqttv5.client.version>1.2.5</paho.mqttv5.client.version>
<netty.version>4.1.75.Final</netty.version>
<netty-tcnative.version>2.0.51.Final</netty-tcnative.version>
<os-maven-plugin.version>1.7.0</os-maven-plugin.version>
@ -1685,6 +1686,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${paho.client.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>${paho.mqttv5.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>