From f191357b905f55fc7ec57adcdccff80470fa7aa5 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Fri, 9 Jun 2023 14:32:58 +0300 Subject: [PATCH 1/3] fixed NPE in Flow output node when it used after split array msg node --- .../thingsboard/server/common/msg/TbMsgProcessingCtx.java | 7 ++++++- .../rule/engine/transform/TbSplitArrayMsgNode.java | 6 ++++-- .../rule/engine/transform/TbSplitArrayMsgNodeTest.java | 1 + 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java index 9010fc0b54..51cad0976f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java @@ -64,7 +64,12 @@ public final class TbMsgProcessingCtx implements Serializable { } public TbMsgProcessingStackItem pop() { - return !stack.isEmpty() ? stack.removeLast() : null; + if (stack == null) { + throw new RuntimeException("Stack is null!"); + } else if (stack.isEmpty()) { + return null; + } + return stack.removeLast(); } public static TbMsgProcessingCtx fromProto(MsgProtos.TbMsgProcessingCtxProto ctx) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNode.java index 7723d6028f..ae7ceade97 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNode.java @@ -77,8 +77,10 @@ public class TbSplitArrayMsgNode implements TbNode { ctx.tellFailure(msg, e); } }); - data.forEach(msgNode -> ctx.enqueueForTellNext(TbMsg.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(msgNode)), - TbRelationTypes.SUCCESS, wrapper::onSuccess, wrapper::onFailure)); + data.forEach(msgNode -> { + TbMsg outMsg = TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), JacksonUtil.toString(msgNode)); + ctx.enqueueForTellNext(outMsg, TbRelationTypes.SUCCESS, wrapper::onSuccess, wrapper::onFailure); + }); } } else { ctx.tellFailure(msg, new RuntimeException("Msg data is not a JSON Array!")); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNodeTest.java index 435ec91aaa..67ebf6f6fc 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbSplitArrayMsgNodeTest.java @@ -96,6 +96,7 @@ public class TbSplitArrayMsgNodeTest { ArgumentCaptor newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(ctx, never()).tellSuccess(any()); + verify(ctx, never()).enqueueForTellNext(any(), anyString(), any(), any()); verify(ctx, times(1)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture()); assertThat(exceptionCaptor.getValue()).isInstanceOf(RuntimeException.class); From e5a2712d8947b4b5c60433f5e8728700ed682ebc Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Mon, 12 Jun 2023 13:53:10 +0300 Subject: [PATCH 2/3] changed logic to ack msg if stack is null --- .../thingsboard/server/common/msg/TbMsgProcessingCtx.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java index 51cad0976f..4e25ae17dc 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java @@ -64,12 +64,7 @@ public final class TbMsgProcessingCtx implements Serializable { } public TbMsgProcessingStackItem pop() { - if (stack == null) { - throw new RuntimeException("Stack is null!"); - } else if (stack.isEmpty()) { - return null; - } - return stack.removeLast(); + return stack == null || stack.isEmpty() ? null : stack.removeLast(); } public static TbMsgProcessingCtx fromProto(MsgProtos.TbMsgProcessingCtxProto ctx) { From a5de29c1f4731762332dd25284b47006cf76b224 Mon Sep 17 00:00:00 2001 From: ShvaykaD Date: Mon, 12 Jun 2023 15:50:49 +0300 Subject: [PATCH 3/3] code readability fix --- .../thingsboard/server/common/msg/TbMsgProcessingCtx.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java index 4e25ae17dc..1b1cbcdf54 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgProcessingCtx.java @@ -64,7 +64,10 @@ public final class TbMsgProcessingCtx implements Serializable { } public TbMsgProcessingStackItem pop() { - return stack == null || stack.isEmpty() ? null : stack.removeLast(); + if (stack == null || stack.isEmpty()) { + return null; + } + return stack.removeLast(); } public static TbMsgProcessingCtx fromProto(MsgProtos.TbMsgProcessingCtxProto ctx) {