diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index f3e29dc1ad..74ecdb2a76 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.mqtt.rpc; import com.datastax.driver.core.utils.UUIDs; +import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; @@ -23,19 +24,19 @@ import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.controller.AbstractControllerTest; +import org.thingsboard.server.mqtt.telemetry.AbstractMqttTelemetryIntegrationTest; import org.thingsboard.server.service.security.AccessValidator; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -101,13 +102,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(accessToken); client.connect(options).waitForCompletion(); - client.subscribe("v1/devices/me/rpc/request/+", 1); - client.setCallback(new TestMqttCallback(client)); + + TestMqttCallback callback = new TestMqttCallback(client); + client.setCallback(callback); + CountDownLatch latch = new CountDownLatch(1); + latch.countDown(); + client.subscribe("v1/devices/me/rpc/request/+", MqttQoS.AT_MOST_ONCE.value()); String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); String result = doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isOk()); Assert.assertTrue(StringUtils.isEmpty(result)); + latch.await(3, TimeUnit.SECONDS); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); } @Test @@ -204,11 +211,16 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC private static class TestMqttCallback implements MqttCallback { private final MqttAsyncClient client; + private Integer qoS; TestMqttCallback(MqttAsyncClient client) { this.client = client; } + int getQoS() { + return qoS; + } + @Override public void connectionLost(Throwable throwable) { } @@ -219,6 +231,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC MqttMessage message = new MqttMessage(); String responseTopic = requestTopic.replace("request", "response"); message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8")); + qoS = mqttMessage.getQos(); client.publish(responseTopic, message); } diff --git a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java index d02d07dff9..df9601e735 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/telemetry/AbstractMqttTelemetryIntegrationTest.java @@ -15,10 +15,9 @@ */ package org.thingsboard.server.mqtt.telemetry; +import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.*; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -30,9 +29,12 @@ import org.thingsboard.server.dao.service.DaoNoSqlTest; import java.net.URI; import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; /** * @author Valerii Sosliuk @@ -94,4 +96,60 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr assertEquals("3.0", values.get("key3").get(0).get("value")); assertEquals("4", values.get("key4").get(0).get("value")); } + + @Test + public void testMqttQoSLevel() throws Exception { + String clientId = MqttAsyncClient.generateClientId(); + MqttAsyncClient client = new MqttAsyncClient(MQTT_URL, clientId); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(accessToken); + client.connect(options).waitForCompletion(3000); + TestMqttCallback callback = new TestMqttCallback(client); + client.setCallback(callback); + CountDownLatch latch = new CountDownLatch(1); + latch.countDown(); + client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value()); + String payload = "{\"key\":\"value\"}"; + String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); + latch.await(3, TimeUnit.SECONDS); + assertEquals(payload, callback.getPayload()); + assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); + } + + private static class TestMqttCallback implements MqttCallback { + + private final MqttAsyncClient client; + private Integer qoS; + private String payload; + + String getPayload() { + return payload; + } + + TestMqttCallback(MqttAsyncClient client) { + this.client = client; + } + + int getQoS() { + return qoS; + } + + @Override + public void connectionLost(Throwable throwable) { + } + + @Override + public void messageArrived(String requestTopic, MqttMessage mqttMessage) { + payload = new String(mqttMessage.getPayload()); + qoS = mqttMessage.getQos(); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + } + + } diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopicMatcher.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopicMatcher.java new file mode 100644 index 0000000000..b4a20283f7 --- /dev/null +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTopicMatcher.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.mqtt; + +import java.util.regex.Pattern; + +public class MqttTopicMatcher { + + private final String topic; + private final Pattern topicRegex; + + MqttTopicMatcher(String topic) { + if(topic == null){ + throw new NullPointerException("topic"); + } + this.topic = topic; + this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$"); + } + + public String getTopic() { + return topic; + } + + public boolean matches(String topic){ + return this.topicRegex.matcher(topic).matches(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MqttTopicMatcher that = (MqttTopicMatcher) o; + + return topic.equals(that.topic); + } + + @Override + public int hashCode() { + return topic.hashCode(); + } +} diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index ed0c8c642c..3c8365e27d 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.regex.Pattern; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; import static io.netty.handler.codec.mqtt.MqttMessageType.*; @@ -78,7 +79,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private final RelationService relationService; private final QuotaService quotaService; private final SslHandler sslHandler; - private final ConcurrentMap mqttQoSMap; + private final ConcurrentMap mqttQoSMap; private volatile boolean connected; private volatile InetSocketAddress address; @@ -278,7 +279,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void registerSubQoS(String topic, List grantedQoSList, MqttQoS reqQoS) { grantedQoSList.add(getMinSupportedQos(reqQoS)); - mqttQoSMap.put(topic, getMinSupportedQos(reqQoS)); + mqttQoSMap.put(new MqttTopicMatcher(topic), getMinSupportedQos(reqQoS)); } private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { @@ -287,7 +288,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); for (String topicName : mqttMsg.payload().topics()) { - mqttQoSMap.remove(topicName); + mqttQoSMap.remove(new MqttTopicMatcher(topicName)); try { switch (topicName) { case DEVICE_ATTRIBUTES_TOPIC: { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java index 3dbb3efe0e..256ed119da 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/DeviceSessionCtx.java @@ -16,7 +16,10 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttQoS; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.SessionId; import org.thingsboard.server.common.msg.session.SessionActorToAdaptorMsg; @@ -27,10 +30,9 @@ import org.thingsboard.server.common.msg.session.ex.SessionException; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; -import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +48,7 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile boolean allowAttributeResponses; private AtomicInteger msgIdSeq = new AtomicInteger(0); - public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap mqttQoSMap) { + public DeviceSessionCtx(SessionMsgProcessor processor, DeviceAuthService authService, MqttTransportAdaptor adaptor, ConcurrentMap mqttQoSMap) { super(processor, authService, mqttQoSMap); this.adaptor = adaptor; this.sessionId = new MqttSessionId(); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index e5be5c71a1..bbc9c086a8 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.msg.session.*; import org.thingsboard.server.common.msg.session.ex.SessionException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.MqttTopics; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; @@ -58,7 +59,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext { private volatile boolean closed; private AtomicInteger msgIdSeq = new AtomicInteger(0); - public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap mqttQoSMap) { + public GatewayDeviceSessionCtx(GatewaySessionCtx parent, Device device, ConcurrentMap mqttQoSMap) { super(parent.getProcessor(), parent.getAuthService(), device, mqttQoSMap); this.parent = parent; this.sessionId = new MqttSessionId(); diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java index 98ad6d2c2c..b9291b794a 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionCtx.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; @@ -64,7 +65,7 @@ public class GatewaySessionCtx { private final DeviceAuthService authService; private final RelationService relationService; private final Map devices; - private final ConcurrentMap mqttQoSMap; + private final ConcurrentMap mqttQoSMap; private ChannelHandlerContext channel; public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) { diff --git a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java index f085064016..0752331f5a 100644 --- a/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java +++ b/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/MqttDeviceAwareSessionContext.java @@ -20,35 +20,42 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.transport.SessionMsgProcessor; import org.thingsboard.server.common.transport.auth.DeviceAuthService; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; +import org.thingsboard.server.transport.mqtt.MqttTopicMatcher; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; /** * Created by ashvayka on 30.08.18. */ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext { - private final ConcurrentMap mqttQoSMap; + private final ConcurrentMap mqttQoSMap; - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap mqttQoSMap) { + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap mqttQoSMap) { super(processor, authService); this.mqttQoSMap = mqttQoSMap; } - public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap mqttQoSMap) { + public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap mqttQoSMap) { super(processor, authService, device); this.mqttQoSMap = mqttQoSMap; } - public ConcurrentMap getMqttQoSMap() { + public ConcurrentMap getMqttQoSMap() { return mqttQoSMap; } public MqttQoS getQoSForTopic(String topic) { - Integer qos = mqttQoSMap.get(topic); - if (qos != null) { - return MqttQoS.valueOf(qos); + List qosList = mqttQoSMap.entrySet() + .stream() + .filter(entry -> entry.getKey().matches(topic)) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + if (!qosList.isEmpty()) { + return MqttQoS.valueOf(qosList.get(0)); } else { return MqttQoS.AT_LEAST_ONCE; }