created azure iot hub rule node

This commit is contained in:
YevhenBondarenko 2020-07-17 20:15:54 +03:00 committed by Andrew Shvayka
parent 557d392847
commit a84160a153
8 changed files with 362 additions and 23 deletions

View File

@ -0,0 +1,22 @@
-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIEAgAAuTANBgkqhkiG9w0BAQUFADBaMQswCQYDVQQGEwJJ
RTESMBAGA1UEChMJQmFsdGltb3JlMRMwEQYDVQQLEwpDeWJlclRydXN0MSIwIAYD
VQQDExlCYWx0aW1vcmUgQ3liZXJUcnVzdCBSb290MB4XDTAwMDUxMjE4NDYwMFoX
DTI1MDUxMjIzNTkwMFowWjELMAkGA1UEBhMCSUUxEjAQBgNVBAoTCUJhbHRpbW9y
ZTETMBEGA1UECxMKQ3liZXJUcnVzdDEiMCAGA1UEAxMZQmFsdGltb3JlIEN5YmVy
VHJ1c3QgUm9vdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKMEuyKr
mD1X6CZymrV51Cni4eiVgLGw41uOKymaZN+hXe2wCQVt2yguzmKiYv60iNoS6zjr
IZ3AQSsBUnuId9Mcj8e6uYi1agnnc+gRQKfRzMpijS3ljwumUNKoUMMo6vWrJYeK
mpYcqWe4PwzV9/lSEy/CG9VwcPCPwBLKBsua4dnKM3p31vjsufFoREJIE9LAwqSu
XmD+tqYF/LTdB1kC1FkYmGP1pWPgkAx9XbIGevOF6uvUA65ehD5f/xXtabz5OTZy
dc93Uk3zyZAsuT3lySNTPx8kmCFcB5kpvcY67Oduhjprl3RjM71oGDHweI12v/ye
jl0qhqdNkNwnGjkCAwEAAaNFMEMwHQYDVR0OBBYEFOWdWTCCR1jMrPoIVDaGezq1
BE3wMBIGA1UdEwEB/wQIMAYBAf8CAQMwDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3
DQEBBQUAA4IBAQCFDF2O5G9RaEIFoN27TyclhAO992T9Ldcw46QQF+vaKSm2eT92
9hkTI7gQCvlYpNRhcL0EYWoSihfVCr3FvDB81ukMJY2GQE/szKN+OMY3EU/t3Wgx
jkzSswF07r51XgdIGn9w/xZchMB5hbgF/X++ZRGjD8ACtPhSNzkE1akxehi/oCr0
Epn3o0WC4zxe9Z2etciefC7IpJ5OCBRLbf1wbWsaY71k5h+3zvDyny67G7fyUIhz
ksLi4xaNmjICq44Y3ekQEe5+NauQrz4wlHrQMz2nZQ/1/I6eYs9HRCwBXbsdtTLS
R9I4LtD+gdwyah617jzV/OeBHRnDJELqYzmp
-----END CERTIFICATE-----

View File

@ -0,0 +1,99 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;
@Slf4j
public final class AzureIotHubUtil {
private static final String BASE_DIR_PATH = System.getProperty("user.dir");
private static final String APP_DIR = "application";
private static final String SRC_DIR = "src";
private static final String MAIN_DIR = "main";
private static final String DATA_DIR = "data";
private static final String CERTS_DIR = "certs";
private static final String AZURE_DIR = "azure";
private static final String FILE_NAME = "BaltimoreCyberTrustRoot.crt.pem";
private static final Path FULL_FILE_PATH;
static {
if (BASE_DIR_PATH.endsWith("bin")) {
FULL_FILE_PATH = Paths.get(BASE_DIR_PATH.replaceAll("bin$", ""), DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
} else if (BASE_DIR_PATH.endsWith("conf")) {
FULL_FILE_PATH = Paths.get(BASE_DIR_PATH.replaceAll("conf$", ""), DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
} else {
FULL_FILE_PATH = Paths.get(BASE_DIR_PATH, APP_DIR, SRC_DIR, MAIN_DIR, DATA_DIR, CERTS_DIR, AZURE_DIR, FILE_NAME);
}
}
private static final long SAS_TOKEN_VALID_SECS = 365 * 24 * 60 * 60;
private static final long ONE_SECOND_IN_MILLISECONDS = 1000;
private static final String SAS_TOKEN_FORMAT = "SharedAccessSignature sr=%s&sig=%s&se=%s";
private static final String USERNAME_FORMAT = "%s/%s/?api-version=2018-06-30";
private AzureIotHubUtil() {
}
public static String buildUsername(String host, String deviceId) {
return String.format(USERNAME_FORMAT, host, deviceId);
}
public static String buildSasToken(String host, String sasKey) {
try {
final String targetUri = URLEncoder.encode(host.toLowerCase(), "UTF-8");
final long expiryTime = buildExpiresOn();
String toSign = targetUri + "\n" + expiryTime;
byte[] keyBytes = Base64.getDecoder().decode(sasKey.getBytes(StandardCharsets.UTF_8));
SecretKeySpec signingKey = new SecretKeySpec(keyBytes, "HmacSHA256");
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSign.getBytes(StandardCharsets.UTF_8));
String signature = URLEncoder.encode(Base64.getEncoder().encodeToString(rawHmac), "UTF-8");
return String.format(SAS_TOKEN_FORMAT, targetUri, signature, expiryTime);
} catch (Exception e) {
throw new RuntimeException("Failed to build SAS token!!!", e);
}
}
private static long buildExpiresOn() {
long expiresOnDate = System.currentTimeMillis();
expiresOnDate += SAS_TOKEN_VALID_SECS * ONE_SECOND_IN_MILLISECONDS;
return expiresOnDate / ONE_SECOND_IN_MILLISECONDS;
}
public static String getDefaultCaCert() {
try {
return new String(Files.readAllBytes(FULL_FILE_PATH));
} catch (IOException e) {
log.error("Failed to load Default CaCert file!!! [{}]", FULL_FILE_PATH.toString());
throw new RuntimeException("Failed to load Default CaCert file!!!");
}
}
}

View File

@ -16,8 +16,6 @@
package org.thingsboard.rule.engine.mqtt;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
@ -36,7 +34,6 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import javax.net.ssl.SSLException;
import java.nio.charset.Charset;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -57,14 +54,14 @@ public class TbMqttNode implements TbNode {
private static final String ERROR = "error";
private TbMqttNodeConfiguration config;
protected TbMqttNodeConfiguration mqttNodeConfiguration;
private MqttClient mqttClient;
protected MqttClient mqttClient;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
try {
this.config = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
this.mqttClient = initClient(ctx);
} catch (Exception e) {
throw new TbNodeException(e);
@ -73,7 +70,7 @@ public class TbMqttNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
String topic = TbNodeUtils.processPattern(this.config.getTopicPattern(), msg.getMetaData());
String topic = TbNodeUtils.processPattern(this.mqttNodeConfiguration.getTopicPattern(), msg.getMetaData());
this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE)
.addListener(future -> {
if (future.isSuccess()) {
@ -99,38 +96,38 @@ public class TbMqttNode implements TbNode {
}
}
private MqttClient initClient(TbContext ctx) throws Exception {
protected MqttClient initClient(TbContext ctx) throws Exception {
Optional<SslContext> sslContextOpt = initSslContext();
MqttClientConfig config = sslContextOpt.isPresent() ? new MqttClientConfig(sslContextOpt.get()) : new MqttClientConfig();
if (!StringUtils.isEmpty(this.config.getClientId())) {
config.setClientId(this.config.getClientId());
if (!StringUtils.isEmpty(this.mqttNodeConfiguration.getClientId())) {
config.setClientId(this.mqttNodeConfiguration.getClientId());
}
config.setCleanSession(this.config.isCleanSession());
this.config.getCredentials().configure(config);
config.setCleanSession(this.mqttNodeConfiguration.isCleanSession());
this.mqttNodeConfiguration.getCredentials().configure(config);
MqttClient client = MqttClient.create(config, null);
client.setEventLoop(ctx.getSharedEventLoop());
Future<MqttConnectResult> connectFuture = client.connect(this.config.getHost(), this.config.getPort());
Future<MqttConnectResult> connectFuture = client.connect(this.mqttNodeConfiguration.getHost(), this.mqttNodeConfiguration.getPort());
MqttConnectResult result;
try {
result = connectFuture.get(this.config.getConnectTimeoutSec(), TimeUnit.SECONDS);
result = connectFuture.get(this.mqttNodeConfiguration.getConnectTimeoutSec(), TimeUnit.SECONDS);
} catch (TimeoutException ex) {
connectFuture.cancel(true);
client.disconnect();
String hostPort = this.config.getHost() + ":" + this.config.getPort();
String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s.", hostPort));
}
if (!result.isSuccess()) {
connectFuture.cancel(true);
client.disconnect();
String hostPort = this.config.getHost() + ":" + this.config.getPort();
String hostPort = this.mqttNodeConfiguration.getHost() + ":" + this.mqttNodeConfiguration.getPort();
throw new RuntimeException(String.format("Failed to connect to MQTT broker at %s. Result code is: %s", hostPort, result.getReturnCode()));
}
return client;
}
private Optional<SslContext> initSslContext() throws SSLException {
Optional<SslContext> result = this.config.getCredentials().initSslContext();
if (this.config.isSsl() && !result.isPresent()) {
Optional<SslContext> result = this.mqttNodeConfiguration.getCredentials().initSslContext();
if (this.mqttNodeConfiguration.isSsl() && !result.isPresent()) {
result = Optional.of(SslContextBuilder.forClient().build());
}
return result;

View File

@ -0,0 +1,91 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.mqtt.azure;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.thingsboard.common.util.AzureIotHubUtil;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
import javax.net.ssl.TrustManagerFactory;
import java.io.ByteArrayInputStream;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Optional;
@Data
@Slf4j
@JsonIgnoreProperties(ignoreUnknown = true)
public class AzureIotHubSasCredentials implements MqttClientCredentials {
private String sasKey;
private String caCert;
@Override
public Optional<SslContext> initSslContext() {
try {
Security.addProvider(new BouncyCastleProvider());
if (caCert == null || caCert.isEmpty()) {
caCert = AzureIotHubUtil.getDefaultCaCert();
}
return Optional.of(SslContextBuilder.forClient()
.trustManager(createAndInitTrustManagerFactory())
.clientAuth(ClientAuth.REQUIRE)
.build());
} catch (Exception e) {
log.error("[{}] Creating TLS factory failed!", caCert, e);
throw new RuntimeException("Creating TLS factory failed!", e);
}
}
@Override
public void configure(MqttClientConfig config) {
}
private TrustManagerFactory createAndInitTrustManagerFactory() throws Exception {
X509Certificate caCertHolder;
caCertHolder = readCertFile(caCert);
KeyStore caKeyStore = KeyStore.getInstance(KeyStore.getDefaultType());
caKeyStore.load(null, null);
caKeyStore.setCertificateEntry("caCert-cert", caCertHolder);
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(caKeyStore);
return trustManagerFactory;
}
private X509Certificate readCertFile(String fileContent) throws Exception {
X509Certificate certificate = null;
if (fileContent != null && !fileContent.trim().isEmpty()) {
fileContent = fileContent.replace("-----BEGIN CERTIFICATE-----", "")
.replace("-----END CERTIFICATE-----", "")
.replaceAll("\\s", "");
byte[] decoded = Base64.decodeBase64(fileContent);
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
certificate = (X509Certificate) certFactory.generateCertificate(new ByteArrayInputStream(decoded));
}
return certificate;
}
}

View File

@ -0,0 +1,88 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.mqtt.azure;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.AzureIotHubUtil;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.mqtt.TbMqttNode;
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
import org.thingsboard.rule.engine.mqtt.credentials.CertPemClientCredentials;
import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
import org.thingsboard.server.common.data.plugin.ComponentType;
import java.util.Optional;
@Slf4j
@RuleNode(
type = ComponentType.EXTERNAL,
name = "azure iot hub",
configClazz = TbAzureIotHubNodeConfiguration.class,
nodeDescription = "Publish messages to the Azure IoT Hub",
nodeDetails = "Will publish message payload to the Azure IoT Hub with QoS <b>AT_LEAST_ONCE</b>.",
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
configDirective = "tbActionNodeAzureIotHubConfig"
)
public class TbAzureIotHubNode extends TbMqttNode {
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
try {
this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class);
mqttNodeConfiguration.setPort(8883);
mqttNodeConfiguration.setCleanSession(true);
MqttClientCredentials credentials = mqttNodeConfiguration.getCredentials();
mqttNodeConfiguration.setCredentials(new MqttClientCredentials() {
@Override
public Optional<SslContext> initSslContext() {
if (credentials instanceof AzureIotHubSasCredentials) {
AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
if (sasCredentials.getCaCert() == null || sasCredentials.getCaCert().isEmpty()) {
sasCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
}
} else if (credentials instanceof CertPemClientCredentials) {
CertPemClientCredentials pemCredentials = (CertPemClientCredentials) credentials;
if (pemCredentials.getCaCert() == null || pemCredentials.getCaCert().isEmpty()) {
pemCredentials.setCaCert(AzureIotHubUtil.getDefaultCaCert());
}
}
return credentials.initSslContext();
}
@Override
public void configure(MqttClientConfig config) {
config.setProtocolVersion(MqttVersion.MQTT_3_1_1);
config.setUsername(AzureIotHubUtil.buildUsername(mqttNodeConfiguration.getHost(), config.getClientId()));
if (credentials instanceof AzureIotHubSasCredentials) {
AzureIotHubSasCredentials sasCredentials = (AzureIotHubSasCredentials) credentials;
config.setPassword(AzureIotHubUtil.buildSasToken(mqttNodeConfiguration.getHost(), sasCredentials.getSasKey()));
// config.setPassword("SharedAccessSignature sr=TBIoT2.azure-devices.net%2Fdevices%2Fdevice&sig=gTu9ZBFydojRXpCWC0fq3C6vfC%2FBevULhdsy4CzWa0Y%3D&se=1594986116");
}
}
});
this.mqttClient = initClient(ctx);
} catch (Exception e) {
throw new TbNodeException(e);
} }
}

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.mqtt.azure;
import lombok.Data;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.rule.engine.mqtt.TbMqttNodeConfiguration;
import org.thingsboard.rule.engine.mqtt.credentials.AnonymousCredentials;
import org.thingsboard.rule.engine.mqtt.credentials.MqttClientCredentials;
@Data
public class TbAzureIotHubNodeConfiguration extends TbMqttNodeConfiguration {
@Override
public TbAzureIotHubNodeConfiguration defaultConfiguration() {
TbAzureIotHubNodeConfiguration configuration = new TbAzureIotHubNodeConfiguration();
configuration.setTopicPattern("devices/<device_id>/messages/events/");
configuration.setHost("<iot-hub-name>.azure-devices.net");
configuration.setPort(8883);
configuration.setConnectTimeoutSec(10);
configuration.setCleanSession(true);
configuration.setSsl(true);
configuration.setCredentials(new AzureIotHubSasCredentials());
return configuration;
}
}

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.netty.handler.ssl.SslContext;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.rule.engine.mqtt.azure.AzureIotHubSasCredentials;
import java.util.Optional;
@ -29,6 +30,7 @@ import java.util.Optional;
@JsonSubTypes({
@JsonSubTypes.Type(value = AnonymousCredentials.class, name = "anonymous"),
@JsonSubTypes.Type(value = BasicCredentials.class, name = "basic"),
@JsonSubTypes.Type(value = AzureIotHubSasCredentials.class, name = "sas"),
@JsonSubTypes.Type(value = CertPemClientCredentials.class, name = "cert.PEM")})
public interface MqttClientCredentials {