diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a51f500f82..a9ddaea2e4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext { @Override public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueue(tpi, tbMsg, onFailure, onSuccess); } @@ -142,46 +142,54 @@ class DefaultTbContext implements TbContext { @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); } @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); } + private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) { + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); + } + + private TopicPartitionInfo resolvePartition(TbMsg tbMsg) { + return resolvePartition(tbMsg, tbMsg.getQueueName()); + } + private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 35808b1f85..4ac7a9e647 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -605,6 +605,8 @@ transport: key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + # Skip certificate validity check for client certificates. + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" # Local CoAP transport parameters coap: # Enable/disable coap transport protocol. diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index 0d06f53ac6..6695fa24ac 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -51,6 +51,10 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.netty.max_payload_size}") private Integer maxPayloadSize; + @Getter + @Value("${transport.mqtt.netty.skip_validity_check_for_client_cert:false}") + private boolean skipValidityCheckForClientCert; + @Getter @Setter private SslHandler sslHandler; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 46a4934fa6..6a09bdda40 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -387,7 +387,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { try { - cert.checkValidity(new Date()); + if(!context.isSkipValidityCheckForClientCert()){ + cert.checkValidity(); + } String strCert = SslUtil.getX509CertificateString(cert); String sha3Hash = EncryptionUtil.getSha3Hash(strCert); transportService.process(DeviceTransportType.MQTT, ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 5f011f48e4..f01b15c77a 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -66,6 +66,8 @@ transport: key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" + # Skip certificate validity check for client certificates. + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"