From a3b005e45a01b54044a4780fa36b263712364c0e Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 6 Oct 2020 17:11:21 +0300 Subject: [PATCH 1/4] added queueName to enqueueForTellNext in TbSendRPCRequestNode --- .../org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 2bd7f315fe..df3ed752b3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -113,7 +113,7 @@ public class TbSendRPCRequestNode implements TbNode { ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); - ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); + ctx.enqueueForTellNext(next, next.getQueueName(), TbRelationTypes.SUCCESS, null, null); } else { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); From 94bb9f5be3c3ca878b9ed5d6b95a7e3156c29ea0 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 6 Oct 2020 18:59:18 +0300 Subject: [PATCH 2/4] update the fix in the API --- .../server/actors/ruleChain/DefaultTbContext.java | 10 +++++----- .../rule/engine/rpc/TbSendRPCRequestNode.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index cf49982f9d..42993273e2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -138,31 +138,31 @@ class DefaultTbContext implements TbContext { @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index df3ed752b3..2bd7f315fe 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -113,7 +113,7 @@ public class TbSendRPCRequestNode implements TbNode { ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { if (!ruleEngineDeviceRpcResponse.getError().isPresent()) { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); - ctx.enqueueForTellNext(next, next.getQueueName(), TbRelationTypes.SUCCESS, null, null); + ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); } else { TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); ctx.tellFailure(next, new RuntimeException(ruleEngineDeviceRpcResponse.getError().get().name())); From 7cfa97ab5edb2f7e416b6a90c3b8221cb6011803 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Tue, 6 Oct 2020 19:04:54 +0300 Subject: [PATCH 3/4] refactoring --- .../actors/ruleChain/DefaultTbContext.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 42993273e2..4fef96c000 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -66,7 +66,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import java.util.Collections; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -121,7 +120,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -138,46 +137,54 @@ class DefaultTbContext implements TbContext { @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } + private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) { + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + } + + private TopicPartitionInfo resolvePartition(TbMsg tbMsg) { + return resolvePartition(tbMsg, tbMsg.getQueueName()); + } + private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); From de9cd8939e12511d4e9e14470813a4d47a3379e2 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 7 Oct 2020 12:06:24 +0300 Subject: [PATCH 4/4] Fix for client certificate check --- application/src/main/resources/thingsboard.yml | 2 ++ .../server/transport/mqtt/MqttTransportContext.java | 4 ++++ .../server/transport/mqtt/MqttTransportHandler.java | 3 +++ transport/mqtt/src/main/resources/tb-mqtt-transport.yml | 2 ++ 4 files changed, 11 insertions(+) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 765747c5ea..4cefe63e54 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -569,6 +569,8 @@ transport: key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + # Skip certificate validity check for client certificates. + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" # Local CoAP transport parameters coap: # Enable/disable coap transport protocol. diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index b058a1c260..285e252e46 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -46,6 +46,10 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; + @Getter + @Value("${transport.mqtt.netty.skip_validity_check_for_client_cert:false}") + private boolean skipValidityCheckForClientCert; + @Getter @Setter private SslHandler sslHandler; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e3861a51b2..5d24f812eb 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -383,6 +383,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { try { + if(!context.isSkipValidityCheckForClientCert()){ + cert.checkValidity(); + } String strCert = SslUtil.getX509CertificateString(cert); String sha3Hash = EncryptionUtil.getSha3Hash(strCert); transportService.process(ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index bc18112c8e..ba8d3f1ad1 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -67,6 +67,8 @@ transport: key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + # Skip certificate validity check for client certificates. + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"