diff --git a/application/pom.xml b/application/pom.xml
index a4602d4b5d..df6195fd24 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -91,8 +91,6 @@
org.eclipse.paho
org.eclipse.paho.client.mqttv3
- 1.1.0
- test
org.cassandraunit
@@ -437,6 +435,11 @@
extension-kafka
extension
+
+ org.thingsboard.extensions
+ extension-mqtt
+ extension
+
diff --git a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
index 784f30b44d..1eca470904 100644
--- a/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
+++ b/extensions/extension-kafka/src/main/java/org/thingsboard/server/extensions/kafka/plugin/KafkaPlugin.java
@@ -68,6 +68,7 @@ public class KafkaPlugin extends AbstractPlugin {
this.producer.close();
} catch (Exception e) {
log.error("Failed to close producer during destroy()", e);
+ throw new RuntimeException(e);
}
}
diff --git a/extensions/extension-mqtt/pom.xml b/extensions/extension-mqtt/pom.xml
new file mode 100644
index 0000000000..1d12eeb287
--- /dev/null
+++ b/extensions/extension-mqtt/pom.xml
@@ -0,0 +1,98 @@
+
+
+
+ 4.0.0
+
+ org.thingsboard
+ 1.3.0-SNAPSHOT
+ extensions
+
+ org.thingsboard.extensions
+ extension-mqtt
+ jar
+
+ Thingsboard Server MQTT Extension
+ http://thingsboard.org
+
+
+ UTF-8
+ ${basedir}/../..
+
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ provided
+
+
+ ch.qos.logback
+ logback-core
+ provided
+
+
+ ch.qos.logback
+ logback-classic
+ provided
+
+
+ org.thingsboard
+ extensions-api
+ provided
+
+
+ org.thingsboard
+ extensions-core
+ provided
+
+
+ org.apache.velocity
+ velocity
+ provided
+
+
+ org.apache.velocity
+ velocity-tools
+ provided
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/assembly/extension.xml
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/extensions/extension-mqtt/src/assembly/extension.xml b/extensions/extension-mqtt/src/assembly/extension.xml
new file mode 100644
index 0000000000..1229956fa0
--- /dev/null
+++ b/extensions/extension-mqtt/src/assembly/extension.xml
@@ -0,0 +1,34 @@
+
+
+ extension
+
+ jar
+
+ false
+
+
+ /
+ true
+ true
+ runtime
+
+
+
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttActionMsg.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttActionMsg.java
new file mode 100644
index 0000000000..087696c687
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttActionMsg.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.action;
+
+import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg;
+
+public class MqttActionMsg extends AbstractRuleToPluginMsg {
+
+ public MqttActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, MqttActionPayload payload) {
+ super(tenantId, customerId, deviceId, payload);
+ }
+}
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttActionPayload.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttActionPayload.java
new file mode 100644
index 0000000000..dcdfdc13e4
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttActionPayload.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.action;
+
+import lombok.Builder;
+import lombok.Data;
+import org.thingsboard.server.common.msg.session.MsgType;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+public class MqttActionPayload implements Serializable {
+
+ private final boolean sync;
+ private final String topic;
+ private final String msgBody;
+
+ private final Integer requestId;
+ private final MsgType msgType;
+}
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginAction.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginAction.java
new file mode 100644
index 0000000000..5d3ae487cb
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginAction.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.action;
+
+import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
+import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
+import org.thingsboard.server.extensions.api.component.Action;
+import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
+import org.thingsboard.server.extensions.api.rules.RuleContext;
+import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction;
+
+import java.util.Optional;
+
+@Action(name = "Mqtt Plugin Action", descriptor = "MqttActionDescriptor.json", configuration = MqttPluginActionConfiguration.class)
+public class MqttPluginAction extends AbstractTemplatePluginAction {
+
+ @Override
+ protected Optional> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
+ MqttActionPayload.MqttActionPayloadBuilder builder = MqttActionPayload.builder();
+ builder.sync(configuration.isSync());
+ builder.msgType(payload.getMsgType());
+ builder.requestId(payload.getRequestId());
+ builder.topic(configuration.getTopic());
+ builder.msgBody(getMsgBody(ctx, msg));
+ return Optional.of(new MqttActionMsg(msg.getTenantId(),
+ msg.getCustomerId(),
+ msg.getDeviceId(),
+ builder.build()));
+ }
+}
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginActionConfiguration.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginActionConfiguration.java
new file mode 100644
index 0000000000..94deb511a0
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/action/MqttPluginActionConfiguration.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.action;
+
+import lombok.Data;
+import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration;
+
+@Data
+public class MqttPluginActionConfiguration implements TemplateActionConfiguration {
+ private boolean sync;
+ private String topic;
+ private String template;
+}
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttMsgHandler.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttMsgHandler.java
new file mode 100644
index 0000000000..08e50598ba
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttMsgHandler.java
@@ -0,0 +1,70 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.plugin;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.thingsboard.server.common.data.id.RuleId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
+import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
+import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
+import org.thingsboard.server.extensions.api.rules.RuleException;
+import org.thingsboard.server.extensions.mqtt.action.MqttActionMsg;
+import org.thingsboard.server.extensions.mqtt.action.MqttActionPayload;
+
+import java.nio.charset.StandardCharsets;
+
+@RequiredArgsConstructor
+@Slf4j
+public class MqttMsgHandler implements RuleMsgHandler {
+
+ private final MqttAsyncClient mqttClient;
+
+ @Override
+ public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg> msg) throws RuleException {
+ if (!(msg instanceof MqttActionMsg)) {
+ throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!");
+ }
+ MqttActionPayload payload = ((MqttActionMsg) msg).getPayload();
+ MqttMessage mqttMsg = new MqttMessage(payload.getMsgBody().getBytes(StandardCharsets.UTF_8));
+ try {
+ mqttClient.publish(payload.getTopic(), mqttMsg, null, new IMqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken asyncActionToken) {
+ log.debug("Message [{}] was successfully delivered to topic [{}]!", msg.toString(), payload.getTopic());
+ if (payload.isSync()) {
+ ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
+ BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
+ }
+ }
+ @Override
+ public void onFailure(IMqttToken asyncActionToken, Throwable e) {
+ log.warn("Failed to deliver message [{}] to topic [{}]!", msg.toString(), payload.getTopic());
+ if (payload.isSync()) {
+ ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
+ BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), new Exception(e))));
+ }
+ }
+ });
+ } catch (MqttException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttPlugin.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttPlugin.java
new file mode 100644
index 0000000000..3ff4dd0a59
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttPlugin.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.plugin;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.paho.client.mqttv3.*;
+import org.thingsboard.server.extensions.api.component.Plugin;
+import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
+import org.thingsboard.server.extensions.api.plugins.PluginContext;
+import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
+import org.thingsboard.server.extensions.mqtt.action.MqttPluginAction;
+
+import java.util.UUID;
+
+@Plugin(name = "Mqtt Plugin", actions = {MqttPluginAction.class},
+ descriptor = "MqttPluginDescriptor.json", configuration = MqttPluginConfiguration.class)
+@Slf4j
+public class MqttPlugin extends AbstractPlugin {
+
+ private MqttMsgHandler handler;
+
+ private MqttAsyncClient mqttClient;
+ private MqttConnectOptions mqttClientOptions;
+
+ private int retryInterval;
+
+ private final Object connectLock = new Object();
+
+ @Override
+ public void init(MqttPluginConfiguration configuration) {
+ retryInterval = configuration.getRetryInterval();
+
+ mqttClientOptions = new MqttConnectOptions();
+ mqttClientOptions.setCleanSession(false);
+ mqttClientOptions.setMaxInflight(configuration.getMaxInFlight());
+ mqttClientOptions.setAutomaticReconnect(true);
+ String clientId = configuration.getClientId();
+ if (StringUtils.isEmpty(clientId)) {
+ clientId = UUID.randomUUID().toString();
+ }
+ if (!StringUtils.isEmpty(configuration.getAccessToken())) {
+ mqttClientOptions.setUserName(configuration.getAccessToken());
+ }
+ try {
+ mqttClient = new MqttAsyncClient("tcp://" + configuration.getHost() + ":" + configuration.getPort(), clientId);
+ } catch (Exception e) {
+ log.error("Failed to create mqtt client", e);
+ throw new RuntimeException(e);
+ }
+ connect();
+ }
+
+ private void connect() {
+ if (!mqttClient.isConnected()) {
+ synchronized (connectLock) {
+ while (!mqttClient.isConnected()) {
+ log.debug("Attempt to connect to requested mqtt host [{}]!", mqttClient.getServerURI());
+ try {
+ mqttClient.connect(mqttClientOptions, null, new IMqttActionListener() {
+ @Override
+ public void onSuccess(IMqttToken iMqttToken) {
+ log.info("Connected to requested mqtt host [{}]!", mqttClient.getServerURI());
+ }
+
+ @Override
+ public void onFailure(IMqttToken iMqttToken, Throwable e) {
+ }
+ }).waitForCompletion();
+ } catch (MqttException e) {
+ log.warn("Failed to connect to requested mqtt host [{}]!", mqttClient.getServerURI(), e);
+ if (!mqttClient.isConnected()) {
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e1) {
+ log.trace("Failed to wait for retry interval!", e);
+ }
+ }
+ }
+ }
+ }
+ }
+ this.handler = new MqttMsgHandler(mqttClient);
+ }
+
+ private void destroy() {
+ try {
+ this.handler = null;
+ this.mqttClient.disconnect();
+ } catch (MqttException e) {
+ log.error("Failed to close mqtt client connection during destroy()", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected RuleMsgHandler getRuleMsgHandler() {
+ return handler;
+ }
+
+ @Override
+ public void resume(PluginContext ctx) {
+ connect();
+ }
+
+ @Override
+ public void suspend(PluginContext ctx) {
+ destroy();
+ }
+
+ @Override
+ public void stop(PluginContext ctx) {
+ destroy();
+ }
+}
diff --git a/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttPluginConfiguration.java b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttPluginConfiguration.java
new file mode 100644
index 0000000000..4d50877482
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/java/org/thingsboard/server/extensions/mqtt/plugin/MqttPluginConfiguration.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright © 2016-2017 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.extensions.mqtt.plugin;
+
+import lombok.Data;
+
+@Data
+public class MqttPluginConfiguration {
+ private String host;
+ private int port;
+ private int maxInFlight;
+ private int retryInterval;
+ private String clientId;
+ private String accessToken;
+}
\ No newline at end of file
diff --git a/extensions/extension-mqtt/src/main/resources/MqttActionDescriptor.json b/extensions/extension-mqtt/src/main/resources/MqttActionDescriptor.json
new file mode 100644
index 0000000000..05b9b3ac37
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/resources/MqttActionDescriptor.json
@@ -0,0 +1,32 @@
+{
+ "schema": {
+ "title": "Mqtt Action Configuration",
+ "type": "object",
+ "properties": {
+ "sync": {
+ "title": "Requires delivery confirmation",
+ "type": "boolean"
+ },
+ "topic": {
+ "title": "Topic Name",
+ "type": "string"
+ },
+ "template": {
+ "title": "Body Template",
+ "type": "string"
+ }
+ },
+ "required": [
+ "topic",
+ "template"
+ ]
+ },
+ "form": [
+ "topic",
+ {
+ "key": "template",
+ "type": "textarea",
+ "rows": 5
+ }
+ ]
+}
\ No newline at end of file
diff --git a/extensions/extension-mqtt/src/main/resources/MqttPluginDescriptor.json b/extensions/extension-mqtt/src/main/resources/MqttPluginDescriptor.json
new file mode 100644
index 0000000000..dfd781afff
--- /dev/null
+++ b/extensions/extension-mqtt/src/main/resources/MqttPluginDescriptor.json
@@ -0,0 +1,48 @@
+{
+ "schema": {
+ "title": "Mqtt Plugin Configuration",
+ "type": "object",
+ "properties": {
+ "host": {
+ "title": "Specify the host to connect to",
+ "type": "string",
+ "default": "localhost"
+ },
+ "port": {
+ "title": "Connect to the port specified",
+ "type": "integer",
+ "default": 1883
+ },
+ "accessToken": {
+ "title": "Username (Access Token) to be used for authenticating.",
+ "type": "string"
+ },
+ "clientId": {
+ "title": "The id to use for this client.",
+ "type": "string"
+ },
+ "maxInFlight": {
+ "title": "How many messages can be send without acknowledgments.",
+ "type": "integer",
+ "default": 1000
+ },
+ "retryInterval": {
+ "title": "Interval to wait between connect attempts to host.",
+ "type": "integer",
+ "default": 3000
+ }
+ },
+ "required": [
+ "host",
+ "port"
+ ]
+ },
+ "form": [
+ "host",
+ "port",
+ "accessToken",
+ "clientId",
+ "maxInFlight",
+ "retryInterval"
+ ]
+}
\ No newline at end of file
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 2c1fdfe388..fbd2f2e63b 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -38,6 +38,7 @@
extension-rabbitmq
extension-rest-api-call
extension-kafka
+ extension-mqtt
diff --git a/pom.xml b/pom.xml
index 5c16cb60de..7b2c5f8c34 100755
--- a/pom.xml
+++ b/pom.xml
@@ -340,6 +340,12 @@
extension
${project.version}
+
+ org.thingsboard.extensions
+ extension-mqtt
+ extension
+ ${project.version}
+
org.thingsboard.common
data