From 54a344e1d6b3df1de89d39e1c9b00900b4c46729 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Tue, 8 May 2018 14:28:58 +0300 Subject: [PATCH] MQTT Rule Node --- pom.xml | 6 + rule-engine/rule-engine-components/pom.xml | 8 + .../rule/engine/aws/sqs/TbSqsNode.java | 7 +- .../rule/engine/mqtt/TbMqttNode.java | 144 ++++++++++++++++ .../engine/mqtt/TbMqttNodeConfiguration.java | 48 ++++++ .../credentials/AnonymousCredentials.java | 36 ++++ .../mqtt/credentials/BasicCredentials.java | 43 +++++ .../credentials/CertPemClientCredentials.java | 154 ++++++++++++++++++ .../credentials/MqttClientCredentials.java | 40 +++++ 9 files changed, 480 insertions(+), 6 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java diff --git a/pom.xml b/pom.xml index dc58627047..67d48b268b 100755 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,7 @@ 2.5.3 1.2.1 9.4.1211 + 2.0.0TB org/thingsboard/server/gen/**/*, org/thingsboard/server/extensions/core/plugin/telemetry/gen/**/* @@ -818,6 +819,11 @@ exe provided + + nl.jk5.netty-mqtt + netty-mqtt + ${netty-mqtt-client.version} + org.elasticsearch.client rest diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index 4a51c8c0f9..a38df289d3 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -100,6 +100,14 @@ com.rabbitmq amqp-client + + nl.jk5.netty-mqtt + netty-mqtt + + + org.bouncycastle + bcpkix-jdk15on + junit junit diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java index 42c7e4e56e..f5172c7cc3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java @@ -13,17 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.thingsboard.rule.engine.aws.sqs; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.sns.AmazonSNS; -import com.amazonaws.services.sns.AmazonSNSClient; -import com.amazonaws.services.sns.model.PublishRequest; -import com.amazonaws.services.sns.model.PublishResult; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.MessageAttributeValue; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java new file mode 100644 index 0000000000..0ceda9c250 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -0,0 +1,144 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.thingsboard.rule.engine.mqtt; + +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.concurrent.Future; +import lombok.extern.slf4j.Slf4j; +import nl.jk5.mqtt.MqttClient; +import nl.jk5.mqtt.MqttClientConfig; +import nl.jk5.mqtt.MqttConnectResult; +import org.springframework.util.StringUtils; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.*; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; + +import javax.net.ssl.SSLException; +import java.nio.charset.Charset; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "mqtt", + configClazz = TbMqttNodeConfiguration.class, + nodeDescription = "Publish messages to MQTT broker", + nodeDetails = "Expects messages with any message type. Will publish message to MQTT broker.", + uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"}, + configDirective = "tbActionNodeMqttConfig" +) +public class TbMqttNode implements TbNode { + + private static final Charset UTF8 = Charset.forName("UTF-8"); + + private static final String ERROR = "error"; + + private TbMqttNodeConfiguration config; + + private EventLoopGroup eventLoopGroup; + private MqttClient mqttClient; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + try { + this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); + this.eventLoopGroup = new NioEventLoopGroup(); + this.mqttClient = initClient(); + } catch (Exception e) { + throw new TbNodeException(e); + } + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { + String topic = TbNodeUtils.processPattern(this.config.getTopicPattern(), msg.getMetaData()); + this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE) + .addListener(future -> { + if (future.isSuccess()) { + TbMsg next = ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()); + ctx.tellNext(next, TbRelationTypes.SUCCESS); + } else { + TbMsg next = processException(ctx, msg, future.cause()); + ctx.tellNext(next, TbRelationTypes.FAILURE, future.cause()); + } + } + ); + } + + private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) { + TbMsgMetaData metaData = origMsg.getMetaData().copy(); + metaData.putValue(ERROR, e.getClass() + ": " + e.getMessage()); + return ctx.transformMsg(origMsg, origMsg.getType(), origMsg.getOriginator(), metaData, origMsg.getData()); + } + + @Override + public void destroy() { + if (this.mqttClient != null) { + this.mqttClient.disconnect(); + } + if (this.eventLoopGroup != null) { + this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS); + } + } + + private MqttClient initClient() throws Exception { + Optional sslContextOpt = initSslContext(); + MqttClientConfig config = sslContextOpt.isPresent() ? new MqttClientConfig(sslContextOpt.get()) : new MqttClientConfig(); + if (!StringUtils.isEmpty(this.config.getClientId())) { + config.setClientId(this.config.getClientId()); + } + this.config.getCredentials().configure(config); + MqttClient client = MqttClient.create(config); + client.setEventLoop(this.eventLoopGroup); + Future connectFuture = client.connect(this.config.getHost(), this.config.getPort()); + MqttConnectResult result; + try { + result = connectFuture.get(this.config.getConnectTimeoutSec(), TimeUnit.SECONDS); + } catch (TimeoutException ex) { + connectFuture.cancel(true); + client.disconnect(); + String hostPort = this.config.getHost() + ":" + this.config.getPort(); + throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort)); + } + if (!result.isSuccess()) { + connectFuture.cancel(true); + client.disconnect(); + String hostPort = this.config.getHost() + ":" + this.config.getPort(); + throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode())); + } + return client; + } + + private Optional initSslContext() throws SSLException { + Optional result = this.config.getCredentials().initSslContext(); + if (this.config.isSsl() && !result.isPresent()) { + result = Optional.of(SslContextBuilder.forClient().build()); + } + return result; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java new file mode 100644 index 0000000000..f63e20143e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNodeConfiguration.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.thingsboard.rule.engine.mqtt; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.rule.engine.mqtt.credentials.AnonymousCredentials; +import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials; + +@Data +public class TbMqttNodeConfiguration implements NodeConfiguration { + + private String topicPattern; + private String host; + private int port; + private int connectTimeoutSec; + private String clientId; + + private boolean ssl; + private MqttClientCredentials credentials; + + @Override + public TbMqttNodeConfiguration defaultConfiguration() { + TbMqttNodeConfiguration configuration = new TbMqttNodeConfiguration(); + configuration.setTopicPattern("my-topic"); + configuration.setHost("localhost"); + configuration.setPort(1883); + configuration.setConnectTimeoutSec(10); + configuration.setSsl(false); + configuration.setCredentials(new AnonymousCredentials()); + return configuration; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java new file mode 100644 index 0000000000..9d5e8df9b5 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/AnonymousCredentials.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.thingsboard.rule.engine.mqtt.credentials; + +import io.netty.handler.ssl.SslContext; +import nl.jk5.mqtt.MqttClientConfig; + +import java.util.Optional; + +public class AnonymousCredentials implements MqttClientCredentials { + + @Override + public Optional initSslContext() { + return Optional.empty(); + } + + @Override + public void configure(MqttClientConfig config) { + + } +} + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java new file mode 100644 index 0000000000..cbbd7036a2 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/BasicCredentials.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.thingsboard.rule.engine.mqtt.credentials; + +import io.netty.handler.ssl.SslContext; +import lombok.Data; +import nl.jk5.mqtt.MqttClientConfig; + +import java.util.Optional; + +@Data +public class BasicCredentials implements MqttClientCredentials { + + private String username; + private String password; + + @Override + public Optional initSslContext() { + return Optional.empty(); + } + + @Override + public void configure(MqttClientConfig config) { + config.setUsername(username); + config.setPassword(password); + } + +} + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java new file mode 100644 index 0000000000..c9fb4a32a9 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/CertPemClientCredentials.java @@ -0,0 +1,154 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.thingsboard.rule.engine.mqtt.credentials; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import nl.jk5.mqtt.MqttClientConfig; +import org.apache.commons.codec.binary.Base64; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openssl.PEMDecryptorProvider; +import org.bouncycastle.openssl.PEMEncryptedKeyPair; +import org.bouncycastle.openssl.PEMKeyPair; +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; +import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder; +import org.springframework.util.StringUtils; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import java.io.ByteArrayInputStream; +import java.security.*; +import java.security.cert.Certificate; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.security.interfaces.RSAPrivateKey; +import java.security.spec.PKCS8EncodedKeySpec; +import java.util.Optional; + +@Data +@Slf4j +@JsonIgnoreProperties(ignoreUnknown = true) +public class CertPemClientCredentials implements MqttClientCredentials { + + private static final String TLS_VERSION = "TLSv1.2"; + + private String caCert; + private String cert; + private String privateKey; + private String password; + + @Override + public Optional initSslContext() { + try { + Security.addProvider(new BouncyCastleProvider()); + return Optional.of(SslContextBuilder.forClient() + .keyManager(createAndInitKeyManagerFactory()) + .trustManager(createAndInitTrustManagerFactory()) + .clientAuth(ClientAuth.REQUIRE) + .build()); + } catch (Exception e) { + log.error("[{}:{}] Creating TLS factory failed!", caCert, cert, e); + throw new RuntimeException("Creating TLS factory failed!", e); + } + } + + @Override + public void configure(MqttClientConfig config) { + + } + + private KeyManagerFactory createAndInitKeyManagerFactory() throws Exception { + X509Certificate certHolder = readCertFile(cert); + Object keyObject = readPrivateKeyFile(privateKey); + char[] passwordCharArray = "".toCharArray(); + if (!StringUtils.isEmpty(password)) { + passwordCharArray = password.toCharArray(); + } + + JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC"); + + PrivateKey privateKey; + if (keyObject instanceof PEMEncryptedKeyPair) { + PEMDecryptorProvider provider = new JcePEMDecryptorProviderBuilder().build(passwordCharArray); + KeyPair key = keyConverter.getKeyPair(((PEMEncryptedKeyPair) keyObject).decryptKeyPair(provider)); + privateKey = key.getPrivate(); + } else if (keyObject instanceof PEMKeyPair) { + KeyPair key = keyConverter.getKeyPair((PEMKeyPair) keyObject); + privateKey = key.getPrivate(); + } else if (keyObject instanceof PrivateKey) { + privateKey = (PrivateKey)keyObject; + } else { + throw new RuntimeException("Unable to get private key from object: " + keyObject.getClass()); + } + + KeyStore clientKeyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + clientKeyStore.load(null, null); + clientKeyStore.setCertificateEntry("cert", certHolder); + clientKeyStore.setKeyEntry("private-key", + privateKey, + passwordCharArray, + new Certificate[]{certHolder}); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(clientKeyStore, passwordCharArray); + return keyManagerFactory; + } + + private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception { + X509Certificate caCertHolder; + caCertHolder = readCertFile(caCert); + + KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType()); + caKeyStore.load(null, null); + caKeyStore.setCertificateEntry("caCert-cert", caCertHolder); + + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(caKeyStore); + return trustManagerFactory; + } + + private X509Certificate readCertFile(String fileContent) throws Exception { + X509Certificate certificate = null; + if (fileContent != null && !fileContent.trim().isEmpty()) { + fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "") + .replace("-----END CERTIFICATE-----", "") + .replaceAll("\\s", ""); + byte[] decoded = Base64.decodeBase64(fileContent); + CertificateFactory certFactory = CertificateFactory.getInstance("X.509"); + certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded)); + } + return certificate; + } + + private PrivateKey readPrivateKeyFile(String fileContent) throws Exception { + RSAPrivateKey privateKey = null; + if (fileContent != null && !fileContent.isEmpty()) { + fileContent = fileContent.replaceAll(".*BEGIN.*PRIVATE KEY.*", "") + .replaceAll(".*END.*PRIVATE KEY.*", "") + .replaceAll("\\s", ""); + byte[] decoded = Base64.decodeBase64(fileContent); + KeyFactory keyFactory = KeyFactory.getInstance("RSA"); + privateKey = (RSAPrivateKey) keyFactory.generatePrivate(new PKCS8EncodedKeySpec(decoded)); + } + return privateKey; + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java new file mode 100644 index 0000000000..5c4594ff44 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/credentials/MqttClientCredentials.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.thingsboard.rule.engine.mqtt.credentials; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.netty.handler.ssl.SslContext; +import nl.jk5.mqtt.MqttClientConfig; + +import java.util.Optional; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = AnonymousCredentials.class, name = "anonymous"), + @JsonSubTypes.Type(value = BasicCredentials.class, name = "basic"), + @JsonSubTypes.Type(value = CertPemClientCredentials.class, name = "cert.PEM")}) +public interface MqttClientCredentials { + + Optional initSslContext(); + + void configure(MqttClientConfig config); +} +