Added MQTT plugin

This commit is contained in:
volodymyr-babak 2017-05-15 15:52:17 +03:00
parent c3ce0d16a3
commit cd4940ce6d
15 changed files with 582 additions and 2 deletions

View File

@ -91,8 +91,6 @@
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
@ -422,6 +420,11 @@
<artifactId>extension-kafka</artifactId>
<classifier>extension</classifier>
</artifactItem>
<artifactItem>
<groupId>org.thingsboard.extensions</groupId>
<artifactId>extension-mqtt</artifactId>
<classifier>extension</classifier>
</artifactItem>
</artifactItems>
</configuration>
</execution>

View File

@ -68,6 +68,7 @@ public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> {
this.producer.close();
} catch (Exception e) {
log.error("Failed to close producer during destroy()", e);
throw new RuntimeException(e);
}
}

View File

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright © 2016-2017 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>1.2.3-SNAPSHOT</version>
<artifactId>extensions</artifactId>
</parent>
<groupId>org.thingsboard.extensions</groupId>
<artifactId>extension-mqtt</artifactId>
<packaging>jar</packaging>
<name>Thingsboard Server MQTT Extension</name>
<url>http://thingsboard.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>extensions-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-tools</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/extension.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
<!--
Copyright © 2016-2017 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
<id>extension</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.action;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg;
public class MqttActionMsg extends AbstractRuleToPluginMsg<MqttActionPayload> {
public MqttActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, MqttActionPayload payload) {
super(tenantId, customerId, deviceId, payload);
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.action;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.msg.session.MsgType;
import java.io.Serializable;
@Data
@Builder
public class MqttActionPayload implements Serializable {
private final boolean sync;
private final String topic;
private final String msgBody;
private final Integer requestId;
private final MsgType msgType;
}

View File

@ -0,0 +1,43 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.action;
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
import org.thingsboard.server.extensions.api.component.Action;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleContext;
import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction;
import java.util.Optional;
@Action(name = "Mqtt Plugin Action", descriptor = "MqttActionDescriptor.json", configuration = MqttPluginActionConfiguration.class)
public class MqttPluginAction extends AbstractTemplatePluginAction<MqttPluginActionConfiguration> {
@Override
protected Optional<RuleToPluginMsg<?>> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) {
MqttActionPayload.MqttActionPayloadBuilder builder = MqttActionPayload.builder();
builder.sync(configuration.isSync());
builder.msgType(payload.getMsgType());
builder.requestId(payload.getRequestId());
builder.topic(configuration.getTopic());
builder.msgBody(getMsgBody(ctx, msg));
return Optional.of(new MqttActionMsg(msg.getTenantId(),
msg.getCustomerId(),
msg.getDeviceId(),
builder.build()));
}
}

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.action;
import lombok.Data;
import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration;
@Data
public class MqttPluginActionConfiguration implements TemplateActionConfiguration {
private boolean sync;
private String topic;
private String template;
}

View File

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.plugin;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.thingsboard.server.common.data.id.RuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg;
import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
import org.thingsboard.server.extensions.api.rules.RuleException;
import org.thingsboard.server.extensions.mqtt.action.MqttActionMsg;
import org.thingsboard.server.extensions.mqtt.action.MqttActionPayload;
import java.nio.charset.StandardCharsets;
@RequiredArgsConstructor
@Slf4j
public class MqttMsgHandler implements RuleMsgHandler {
private final MqttAsyncClient mqttClient;
@Override
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
if (!(msg instanceof MqttActionMsg)) {
throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!");
}
MqttActionPayload payload = ((MqttActionMsg) msg).getPayload();
MqttMessage mqttMsg = new MqttMessage(payload.getMsgBody().getBytes(StandardCharsets.UTF_8));
try {
mqttClient.publish(payload.getTopic(), mqttMsg, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.debug("Message [{}] was successfully delivered to topic [{}]!", msg.toString(), payload.getTopic());
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId())));
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable e) {
log.warn("Failed to deliver message [{}] to topic [{}]!", msg.toString(), payload.getTopic());
if (payload.isSync()) {
ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId,
BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), new Exception(e))));
}
}
});
} catch (MqttException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.plugin;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.thingsboard.server.extensions.api.component.Plugin;
import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
import org.thingsboard.server.extensions.api.plugins.PluginContext;
import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
import org.thingsboard.server.extensions.mqtt.action.MqttPluginAction;
import java.util.UUID;
@Plugin(name = "Mqtt Plugin", actions = {MqttPluginAction.class},
descriptor = "MqttPluginDescriptor.json", configuration = MqttPluginConfiguration.class)
@Slf4j
public class MqttPlugin extends AbstractPlugin<MqttPluginConfiguration> {
private MqttMsgHandler handler;
private MqttAsyncClient mqttClient;
private MqttConnectOptions mqttClientOptions;
private int retryInterval;
private final Object connectLock = new Object();
@Override
public void init(MqttPluginConfiguration configuration) {
retryInterval = configuration.getRetryInterval();
mqttClientOptions = new MqttConnectOptions();
mqttClientOptions.setCleanSession(false);
mqttClientOptions.setMaxInflight(configuration.getMaxInFlight());
mqttClientOptions.setAutomaticReconnect(true);
String clientId = configuration.getClientId();
if (StringUtils.isEmpty(clientId)) {
clientId = UUID.randomUUID().toString();
}
if (!StringUtils.isEmpty(configuration.getAccessToken())) {
mqttClientOptions.setUserName(configuration.getAccessToken());
}
try {
mqttClient = new MqttAsyncClient("tcp://" + configuration.getHost() + ":" + configuration.getPort(), clientId);
} catch (Exception e) {
log.error("Failed to create mqtt client", e);
throw new RuntimeException(e);
}
connect();
}
private void connect() {
if (!mqttClient.isConnected()) {
synchronized (connectLock) {
while (!mqttClient.isConnected()) {
log.debug("Attempt to connect to requested mqtt host [{}]!", mqttClient.getServerURI());
try {
mqttClient.connect(mqttClientOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
log.info("Connected to requested mqtt host [{}]!", mqttClient.getServerURI());
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable e) {
}
}).waitForCompletion();
} catch (MqttException e) {
log.warn("Failed to connect to requested mqtt host [{}]!", mqttClient.getServerURI(), e);
if (!mqttClient.isConnected()) {
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e1) {
log.trace("Failed to wait for retry interval!", e);
}
}
}
}
}
}
this.handler = new MqttMsgHandler(mqttClient);
}
private void destroy() {
try {
this.handler = null;
this.mqttClient.disconnect();
} catch (MqttException e) {
log.error("Failed to close mqtt client connection during destroy()", e);
throw new RuntimeException(e);
}
}
@Override
protected RuleMsgHandler getRuleMsgHandler() {
return handler;
}
@Override
public void resume(PluginContext ctx) {
connect();
}
@Override
public void suspend(PluginContext ctx) {
destroy();
}
@Override
public void stop(PluginContext ctx) {
destroy();
}
}

View File

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2017 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.extensions.mqtt.plugin;
import lombok.Data;
@Data
public class MqttPluginConfiguration {
private String host;
private int port;
private int maxInFlight;
private int retryInterval;
private String clientId;
private String accessToken;
}

View File

@ -0,0 +1,32 @@
{
"schema": {
"title": "Mqtt Action Configuration",
"type": "object",
"properties": {
"sync": {
"title": "Requires delivery confirmation",
"type": "boolean"
},
"topic": {
"title": "Topic Name",
"type": "string"
},
"template": {
"title": "Body Template",
"type": "string"
}
},
"required": [
"topic",
"template"
]
},
"form": [
"topic",
{
"key": "template",
"type": "textarea",
"rows": 5
}
]
}

View File

@ -0,0 +1,48 @@
{
"schema": {
"title": "Mqtt Plugin Configuration",
"type": "object",
"properties": {
"host": {
"title": "Specify the host to connect to",
"type": "string",
"default": "localhost"
},
"port": {
"title": "Connect to the port specified",
"type": "integer",
"default": 1883
},
"accessToken": {
"title": "Provide a username (accessToken) to be used for authenticating with the broker.",
"type": "string"
},
"clientId": {
"title": "The id to use for this client.",
"type": "string"
},
"maxInFlight": {
"title": "The max inflight limits to how many messages we can send without receiving acknowledgments.",
"type": "integer",
"default": 1000
},
"retryInterval": {
"title": "Interval to wait between connect attempts to host.",
"type": "integer",
"default": 3000
}
},
"required": [
"host",
"port"
]
},
"form": [
"host",
"port",
"accessToken",
"clientId",
"maxInFlight",
"retryInterval"
]
}

View File

@ -38,6 +38,7 @@
<module>extension-rabbitmq</module>
<module>extension-rest-api-call</module>
<module>extension-kafka</module>
<module>extension-mqtt</module>
</modules>
</project>

View File

@ -337,6 +337,12 @@
<classifier>extension</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.thingsboard.extensions</groupId>
<artifactId>extension-mqtt</artifactId>
<classifier>extension</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>data</artifactId>