diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 66fe935d68..9456e710df 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -169,7 +169,9 @@ class DefaultTbContext implements TbContext { private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer onFailure, Runnable onSuccess) { if (!tbMsg.isValid()) { log.trace("[{}] Skip invalid message: {}", getTenantId(), tbMsg); - onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); + if (onFailure != null) { + onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); + } return; } TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() @@ -242,7 +244,9 @@ class DefaultTbContext implements TbContext { private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { if (!source.isValid()) { log.trace("[{}] Skip invalid message: {}", getTenantId(), source); - onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); + if (onFailure != null) { + onFailure.accept(new IllegalArgumentException("Source message is no longer valid!")); + } return; } RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();