From 50d3a6d92500ed63ee7f59585c617866e6c1329e Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Wed, 14 Jun 2023 13:10:52 +0300 Subject: [PATCH 01/10] tbel: add parseBytesToFloat --- .../thingsboard/script/api/tbel/TbUtils.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index 6c50561363..10d48d582a 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.script.api.tbel; +import com.google.common.primitives.Bytes; import org.mvel2.ExecutionContext; import org.mvel2.ParserConfiguration; import org.mvel2.execution.ExecutionArrayList; @@ -81,6 +82,14 @@ public class TbUtils { byte[].class, int.class, int.class))); parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", byte[].class, int.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", + byte[].class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", + byte[].class, int.class))); + parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", + List.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", + List.class, int.class))); parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed", double.class, int.class))); parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes", @@ -293,6 +302,33 @@ public class TbUtils { return bb.getInt(); } + public static float parseBytesToFloat(byte[] data, int offset) { + return parseBytesToFloat(data, offset, true); + } + + public static float parseBytesToFloat(byte[] data, int offset, boolean bigEndian) { + if (data != null && data.length > 0) { + int length = 4; + if (offset > data.length) { + throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); + } + if ((offset + length) > data.length) { + throw new IllegalArgumentException("Default length is always 4 bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); + } + int i = parseBytesToInt(data, offset, length, bigEndian); + return Float.intBitsToFloat(i); + } else { + throw new IllegalArgumentException("Array is null or array length is 0!"); + } + } + public static float parseBytesToFloat(List data, int offset) { + return parseBytesToFloat(data, offset, true); + } + + public static float parseBytesToFloat(List data, int offset, boolean bigEndian) { + return parseBytesToFloat(Bytes.toArray(data), offset, bigEndian); + } + public static String bytesToHex(ExecutionArrayList bytesList) { byte[] bytes = new byte[bytesList.size()]; for (int i = 0; i < bytesList.size(); i++) { From 6d0b16e41c06220f325f1b8ebfad30b1f573d00a Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Wed, 14 Jun 2023 18:56:36 +0300 Subject: [PATCH 02/10] tbel: add parseBytesToDouble --- .../thingsboard/script/api/tbel/TbUtils.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index 10d48d582a..0708d1c959 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -16,6 +16,7 @@ package org.thingsboard.script.api.tbel; import com.google.common.primitives.Bytes; +import org.apache.commons.lang3.ArrayUtils; import org.mvel2.ExecutionContext; import org.mvel2.ParserConfiguration; import org.mvel2.execution.ExecutionArrayList; @@ -31,6 +32,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.List; @@ -90,6 +92,14 @@ public class TbUtils { List.class, int.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", List.class, int.class))); + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + byte[].class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + byte[].class, int.class))); + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + List.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + List.class, int.class))); parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed", double.class, int.class))); parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes", @@ -329,6 +339,39 @@ public class TbUtils { return parseBytesToFloat(Bytes.toArray(data), offset, bigEndian); } + + public static double parseBytesToDouble(byte[] data, int offset) { + return parseBytesToDouble(data, offset, true); + } + + public static double parseBytesToDouble(byte[] data, int offset, boolean bigEndian) { + if (data != null && data.length > 0) { + int length = 8; + if (offset > data.length) { + throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); + } + if ((offset + length) > data.length) { + throw new IllegalArgumentException("Default length is always 4 bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); + } + + byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset+length)); + if (!bigEndian) { + ArrayUtils.reverse(dataBytesArray); + } + return ByteBuffer.wrap(dataBytesArray).getDouble(); + } else { + throw new IllegalArgumentException("Array is null or array length is 0!"); + } + } + + public static double parseBytesToDouble(List data, int offset) { + return parseBytesToDouble(data, offset, true); + } + + public static double parseBytesToDouble(List data, int offset, boolean bigEndian) { + return parseBytesToDouble(Bytes.toArray(data), offset, bigEndian); + } + public static String bytesToHex(ExecutionArrayList bytesList) { byte[] bytes = new byte[bytesList.size()]; for (int i = 0; i < bytesList.size(); i++) { From b79176f3b89fa16dd55c5d6e12cddf7dc482bd4b Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 15 Jun 2023 16:50:31 +0300 Subject: [PATCH 03/10] tbel: add parseLong parseHexToLong parseBytesToLong toFixed (float.class) --- .../thingsboard/script/api/tbel/TbUtils.java | 190 ++++++++++++++---- 1 file changed, 149 insertions(+), 41 deletions(-) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index 0708d1c959..d275466b95 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -64,6 +64,10 @@ public class TbUtils { String.class))); parserConfig.addImport("parseInt", new MethodStub(TbUtils.class.getMethod("parseInt", String.class, int.class))); + parserConfig.addImport("parseLong", new MethodStub(TbUtils.class.getMethod("parseLong", + String.class))); + parserConfig.addImport("parseLong", new MethodStub(TbUtils.class.getMethod("parseLong", + String.class, int.class))); parserConfig.addImport("parseFloat", new MethodStub(TbUtils.class.getMethod("parseFloat", String.class))); parserConfig.addImport("parseDouble", new MethodStub(TbUtils.class.getMethod("parseDouble", @@ -76,6 +80,10 @@ public class TbUtils { String.class))); parserConfig.addImport("parseHexToInt", new MethodStub(TbUtils.class.getMethod("parseHexToInt", String.class, boolean.class))); + parserConfig.addImport("parseHexToLong", new MethodStub(TbUtils.class.getMethod("parseHexToLong", + String.class))); + parserConfig.addImport("parseHexToLong", new MethodStub(TbUtils.class.getMethod("parseHexToLong", + String.class, boolean.class))); parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", List.class, int.class, int.class))); parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", @@ -84,6 +92,14 @@ public class TbUtils { byte[].class, int.class, int.class))); parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", byte[].class, int.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", + List.class, int.class, int.class))); + parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", + List.class, int.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", + byte[].class, int.class, int.class))); + parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", + byte[].class, int.class, int.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", byte[].class, int.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", @@ -92,16 +108,18 @@ public class TbUtils { List.class, int.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", List.class, int.class))); - parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", - byte[].class, int.class, boolean.class))); parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", byte[].class, int.class))); parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", - List.class, int.class, boolean.class))); + byte[].class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + List.class, int.class))); parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", - List.class, int.class))); + List.class, int.class, boolean.class))); parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed", double.class, int.class))); + parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed", + float.class, int.class))); parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes", ExecutionContext.class, String.class))); parserConfig.addImport("base64ToHex", new MethodStub(TbUtils.class.getMethod("base64ToHex", @@ -204,6 +222,38 @@ public class TbUtils { return null; } + public static Long parseLong(String value) { + if (value != null) { + try { + int radix = 10; + if (isHexadecimal(value)) { + radix = 16; + } + return Long.parseLong(prepareNumberString(value), radix); + } catch (NumberFormatException e) { + Double d = parseDouble(value); + if (d != null) { + return d.longValue(); + } + } + } + return null; + } + + public static Long parseLong(String value, int radix) { + if (value != null) { + try { + return Long.parseLong(prepareNumberString(value), radix); + } catch (NumberFormatException e) { + Double d = parseDouble(value); + if (d != null) { + return d.longValue(); + } + } + } + return null; + } + public static Float parseFloat(String value) { if (value != null) { try { @@ -251,6 +301,33 @@ public class TbUtils { return parseBytesToInt(data, 0, data.length, bigEndian); } + public static long parseLittleEndianHexToLong(String hex) { + return parseHexToLong(hex, false); + } + + public static long parseBigEndianHexToLong(String hex) { + return parseHexToInt(hex, true); + } + + public static long parseHexToLong(String hex) { + return parseHexToInt(hex, true); + } + + public static long parseHexToLong(String hex, boolean bigEndian) { + int length = hex.length(); + if (length > 16) { + throw new IllegalArgumentException("Hex string is too large. Maximum 8 symbols allowed."); + } + if (length % 2 > 0) { + throw new IllegalArgumentException("Hex string must be even-length."); + } + byte[] data = new byte[length / 2]; + for (int i = 0; i < length; i += 2) { + data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + Character.digit(hex.charAt(i + 1), 16)); + } + return parseBytesToLong(data, 0, data.length, bigEndian); + } + public static ExecutionArrayList hexToBytes(ExecutionContext ctx, String hex) { int len = hex.length(); if (len % 2 > 0) { @@ -312,66 +389,93 @@ public class TbUtils { return bb.getInt(); } + public static long parseBytesToLong(List data, int offset, int length) { + return parseBytesToLong(data, offset, length, true); + } + + public static long parseBytesToLong(List data, int offset, int length, boolean bigEndian) { + final byte[] bytes = new byte[data.size()]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = data.get(i); + } + return parseBytesToLong(bytes, offset, length, bigEndian); + } + + public static long parseBytesToLong(byte[] data, int offset, int length) { + return parseBytesToLong(data, offset, length, true); + } + + public static long parseBytesToLong(byte[] data, int offset, int length, boolean bigEndian) { + if (offset > data.length) { + throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); + } + if (length > 8) { + throw new IllegalArgumentException("Length: " + length + " is too large. Maximum 4 bytes is allowed!"); + } + if (offset + length > data.length) { + throw new IllegalArgumentException("Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); + } + var bb = ByteBuffer.allocate(8); + if (!bigEndian) { + bb.order(ByteOrder.LITTLE_ENDIAN); + } + bb.position(bigEndian ? 8 - length : 0); + bb.put(data, offset, length); + bb.position(0); + return bb.getLong(); + } + public static float parseBytesToFloat(byte[] data, int offset) { return parseBytesToFloat(data, offset, true); } - public static float parseBytesToFloat(byte[] data, int offset, boolean bigEndian) { - if (data != null && data.length > 0) { - int length = 4; - if (offset > data.length) { - throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); - } - if ((offset + length) > data.length) { - throw new IllegalArgumentException("Default length is always 4 bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); - } - int i = parseBytesToInt(data, offset, length, bigEndian); - return Float.intBitsToFloat(i); - } else { - throw new IllegalArgumentException("Array is null or array length is 0!"); - } - } public static float parseBytesToFloat(List data, int offset) { - return parseBytesToFloat(data, offset, true); + return parseBytesToFloat(data, offset,true); } public static float parseBytesToFloat(List data, int offset, boolean bigEndian) { return parseBytesToFloat(Bytes.toArray(data), offset, bigEndian); } + public static float parseBytesToFloat(byte[] data, int offset, boolean bigEndian) { + byte[] bytesToNumber = prepareBytesToNumber (data, offset, 4, bigEndian); + return ByteBuffer.wrap(bytesToNumber).getFloat(); + } + public static double parseBytesToDouble(byte[] data, int offset) { return parseBytesToDouble(data, offset, true); } - public static double parseBytesToDouble(byte[] data, int offset, boolean bigEndian) { - if (data != null && data.length > 0) { - int length = 8; - if (offset > data.length) { - throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); - } - if ((offset + length) > data.length) { - throw new IllegalArgumentException("Default length is always 4 bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); - } - - byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset+length)); - if (!bigEndian) { - ArrayUtils.reverse(dataBytesArray); - } - return ByteBuffer.wrap(dataBytesArray).getDouble(); - } else { - throw new IllegalArgumentException("Array is null or array length is 0!"); - } - } - public static double parseBytesToDouble(List data, int offset) { - return parseBytesToDouble(data, offset, true); + return parseBytesToDouble(data, offset,true); } public static double parseBytesToDouble(List data, int offset, boolean bigEndian) { return parseBytesToDouble(Bytes.toArray(data), offset, bigEndian); } + public static double parseBytesToDouble(byte[] data, int offset, boolean bigEndian) { + byte[] bytesToNumber = prepareBytesToNumber (data, offset, 8, bigEndian); + return ByteBuffer.wrap(bytesToNumber).getDouble(); + } + + private static byte [] prepareBytesToNumber(byte[] data, int offset, int length, boolean bigEndian) { + if (offset > data.length) { + throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); + } + if ((offset + length) > data.length) { + throw new IllegalArgumentException("Default length is always " + length + " bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); + } + byte[] bytesToNumber = new byte[length]; + byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset+length)); + if (!bigEndian) { + ArrayUtils.reverse(dataBytesArray); + } + System.arraycopy(dataBytesArray, 0, bytesToNumber, 0, length); + return bytesToNumber; + } + public static String bytesToHex(ExecutionArrayList bytesList) { byte[] bytes = new byte[bytesList.size()]; for (int i = 0; i < bytesList.size(); i++) { @@ -394,6 +498,10 @@ public class TbUtils { return BigDecimal.valueOf(value).setScale(precision, RoundingMode.HALF_UP).doubleValue(); } + public static float toFixed(float value, int precision) { + return BigDecimal.valueOf(value).setScale(precision, RoundingMode.HALF_UP).floatValue(); + } + private static boolean isHexadecimal(String value) { return value != null && (value.contains("0x") || value.contains("0X")); } From 59dfb4f4e1b11a18cd409ba058b6f57d084351d1 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 15 Jun 2023 17:02:34 +0300 Subject: [PATCH 04/10] tbel: refactoring prepareBytesToNumber --- .../main/java/org/thingsboard/script/api/tbel/TbUtils.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index d275466b95..dec4ff6f15 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -467,13 +467,11 @@ public class TbUtils { if ((offset + length) > data.length) { throw new IllegalArgumentException("Default length is always " + length + " bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); } - byte[] bytesToNumber = new byte[length]; byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset+length)); if (!bigEndian) { ArrayUtils.reverse(dataBytesArray); } - System.arraycopy(dataBytesArray, 0, bytesToNumber, 0, length); - return bytesToNumber; + return dataBytesArray; } public static String bytesToHex(ExecutionArrayList bytesList) { From f15d84e44aff576ed4208f6d7be5f601d6e882d6 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 15 Jun 2023 19:20:50 +0300 Subject: [PATCH 05/10] tbel: add parseHexToLong, Float, Double --- .../thingsboard/script/api/tbel/TbUtils.java | 117 +++++++++++++----- 1 file changed, 83 insertions(+), 34 deletions(-) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index dec4ff6f15..aced1512b6 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -80,18 +80,22 @@ public class TbUtils { String.class))); parserConfig.addImport("parseHexToInt", new MethodStub(TbUtils.class.getMethod("parseHexToInt", String.class, boolean.class))); + parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", + List.class, int.class, int.class))); + parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", + List.class, int.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", + byte[].class, int.class, int.class))); + parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", + byte[].class, int.class, int.class, boolean.class))); + parserConfig.addImport("parseLittleEndianHexToLong", new MethodStub(TbUtils.class.getMethod("parseLittleEndianHexToLong", + String.class))); + parserConfig.addImport("parseBigEndianHexToLong", new MethodStub(TbUtils.class.getMethod("parseBigEndianHexToLong", + String.class))); parserConfig.addImport("parseHexToLong", new MethodStub(TbUtils.class.getMethod("parseHexToLong", String.class))); parserConfig.addImport("parseHexToLong", new MethodStub(TbUtils.class.getMethod("parseHexToLong", String.class, boolean.class))); - parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", - List.class, int.class, int.class))); - parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", - List.class, int.class, int.class, boolean.class))); - parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", - byte[].class, int.class, int.class))); - parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt", - byte[].class, int.class, int.class, boolean.class))); parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", List.class, int.class, int.class))); parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", @@ -100,20 +104,36 @@ public class TbUtils { byte[].class, int.class, int.class))); parserConfig.addImport("parseBytesToLong", new MethodStub(TbUtils.class.getMethod("parseBytesToLong", byte[].class, int.class, int.class, boolean.class))); + parserConfig.addImport("parseLittleEndianHexToFloat", new MethodStub(TbUtils.class.getMethod("parseLittleEndianHexToFloat", + String.class))); + parserConfig.addImport("parseBigEndianHexToFloat", new MethodStub(TbUtils.class.getMethod("parseBigEndianHexToFloat", + String.class))); + parserConfig.addImport("parseHexToFloat", new MethodStub(TbUtils.class.getMethod("parseHexToFloat", + String.class))); + parserConfig.addImport("parseHexToFloat", new MethodStub(TbUtils.class.getMethod("parseHexToFloat", + String.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", byte[].class, int.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", byte[].class, int.class))); - parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", - List.class, int.class, boolean.class))); + parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", + List.class, int.class, boolean.class))); parserConfig.addImport("parseBytesToFloat", new MethodStub(TbUtils.class.getMethod("parseBytesToFloat", List.class, int.class))); + parserConfig.addImport("parseLittleEndianHexToDouble", new MethodStub(TbUtils.class.getMethod("parseLittleEndianHexToDouble", + String.class))); + parserConfig.addImport("parseBigEndianHexToDouble", new MethodStub(TbUtils.class.getMethod("parseBigEndianHexToDouble", + String.class))); + parserConfig.addImport("parseHexToDouble", new MethodStub(TbUtils.class.getMethod("parseHexToDouble", + String.class))); + parserConfig.addImport("parseHexToDouble", new MethodStub(TbUtils.class.getMethod("parseHexToDouble", + String.class, boolean.class))); parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", byte[].class, int.class))); - parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", byte[].class, int.class, boolean.class))); - parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", - List.class, int.class))); + parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", + List.class, int.class))); parserConfig.addImport("parseBytesToDouble", new MethodStub(TbUtils.class.getMethod("parseBytesToDouble", List.class, int.class, boolean.class))); parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed", @@ -287,17 +307,7 @@ public class TbUtils { } public static int parseHexToInt(String hex, boolean bigEndian) { - int length = hex.length(); - if (length > 8) { - throw new IllegalArgumentException("Hex string is too large. Maximum 8 symbols allowed."); - } - if (length % 2 > 0) { - throw new IllegalArgumentException("Hex string must be even-length."); - } - byte[] data = new byte[length / 2]; - for (int i = 0; i < length; i += 2) { - data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + Character.digit(hex.charAt(i + 1), 16)); - } + byte[] data = prepareHexToBytesNumber(hex, 8); return parseBytesToInt(data, 0, data.length, bigEndian); } @@ -306,16 +316,55 @@ public class TbUtils { } public static long parseBigEndianHexToLong(String hex) { - return parseHexToInt(hex, true); + return parseHexToLong(hex, true); } public static long parseHexToLong(String hex) { - return parseHexToInt(hex, true); + return parseHexToLong(hex, true); } public static long parseHexToLong(String hex, boolean bigEndian) { + byte[] data = prepareHexToBytesNumber(hex, 16); + return parseBytesToLong(data, 0, data.length, bigEndian); + } + + public static float parseLittleEndianHexToFloat(String hex) { + return parseHexToFloat(hex, false); + } + + public static float parseBigEndianHexToFloat(String hex) { + return parseHexToFloat(hex, true); + } + + public static float parseHexToFloat(String hex) { + return parseHexToFloat(hex, true); + } + + public static float parseHexToFloat(String hex, boolean bigEndian) { + byte[] data = prepareHexToBytesNumber(hex, 8); + return parseBytesToFloat(data, 0, bigEndian); + } + + public static double parseLittleEndianHexToDouble(String hex) { + return parseHexToDouble(hex, false); + } + + public static double parseBigEndianHexToDouble(String hex) { + return parseHexToDouble(hex, true); + } + + public static double parseHexToDouble(String hex) { + return parseHexToDouble(hex, true); + } + + public static double parseHexToDouble(String hex, boolean bigEndian) { + byte[] data = prepareHexToBytesNumber(hex, 16); + return parseBytesToDouble(data, 0, bigEndian); + } + + private static byte[] prepareHexToBytesNumber(String hex, int len) { int length = hex.length(); - if (length > 16) { + if (length > len) { throw new IllegalArgumentException("Hex string is too large. Maximum 8 symbols allowed."); } if (length % 2 > 0) { @@ -325,7 +374,7 @@ public class TbUtils { for (int i = 0; i < length; i += 2) { data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + Character.digit(hex.charAt(i + 1), 16)); } - return parseBytesToLong(data, 0, data.length, bigEndian); + return data; } public static ExecutionArrayList hexToBytes(ExecutionContext ctx, String hex) { @@ -430,7 +479,7 @@ public class TbUtils { } public static float parseBytesToFloat(List data, int offset) { - return parseBytesToFloat(data, offset,true); + return parseBytesToFloat(data, offset, true); } public static float parseBytesToFloat(List data, int offset, boolean bigEndian) { @@ -438,7 +487,7 @@ public class TbUtils { } public static float parseBytesToFloat(byte[] data, int offset, boolean bigEndian) { - byte[] bytesToNumber = prepareBytesToNumber (data, offset, 4, bigEndian); + byte[] bytesToNumber = prepareBytesToNumber(data, offset, 4, bigEndian); return ByteBuffer.wrap(bytesToNumber).getFloat(); } @@ -448,7 +497,7 @@ public class TbUtils { } public static double parseBytesToDouble(List data, int offset) { - return parseBytesToDouble(data, offset,true); + return parseBytesToDouble(data, offset, true); } public static double parseBytesToDouble(List data, int offset, boolean bigEndian) { @@ -456,18 +505,18 @@ public class TbUtils { } public static double parseBytesToDouble(byte[] data, int offset, boolean bigEndian) { - byte[] bytesToNumber = prepareBytesToNumber (data, offset, 8, bigEndian); + byte[] bytesToNumber = prepareBytesToNumber(data, offset, 8, bigEndian); return ByteBuffer.wrap(bytesToNumber).getDouble(); } - private static byte [] prepareBytesToNumber(byte[] data, int offset, int length, boolean bigEndian) { + private static byte[] prepareBytesToNumber(byte[] data, int offset, int length, boolean bigEndian) { if (offset > data.length) { throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!"); } if ((offset + length) > data.length) { throw new IllegalArgumentException("Default length is always " + length + " bytes. Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!"); } - byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset+length)); + byte[] dataBytesArray = Arrays.copyOfRange(data, offset, (offset + length)); if (!bigEndian) { ArrayUtils.reverse(dataBytesArray); } From 35dfa1e7bd82d9bb1adf75b6d5ceb9ad3cf4ea72 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Fri, 16 Jun 2023 18:57:20 +0300 Subject: [PATCH 06/10] tbel: add parseLong Test --- .../thingsboard/script/api/tbel/TbUtils.java | 104 +++++++++++------- .../script/api/tbel/TbUtilsTest.java | 93 ++++++++++++++++ 2 files changed, 155 insertions(+), 42 deletions(-) diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index aced1512b6..b337612011 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.StringUtils; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; +import java.math.BigInteger; import java.math.RoundingMode; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -211,67 +212,71 @@ public class TbUtils { } public static Integer parseInt(String value) { - if (value != null) { - try { - int radix = 10; - if (isHexadecimal(value)) { - radix = 16; - } - return Integer.parseInt(prepareNumberString(value), radix); - } catch (NumberFormatException e) { - Float f = parseFloat(value); - if (f != null) { - return f.intValue(); - } - } - } - return null; + int radix = getRadix(value); + return parseInt(value, radix); } public static Integer parseInt(String value, int radix) { - if (value != null) { + if (StringUtils.isNotBlank(value)) { try { - return Integer.parseInt(prepareNumberString(value), radix); - } catch (NumberFormatException e) { - Float f = parseFloat(value); - if (f != null) { - return f.intValue(); + String valueP = prepareNumberString(value); + isValidRadix(valueP, radix); + try { + return Integer.parseInt(valueP, radix); + } catch (NumberFormatException e) { + BigInteger bi = new BigInteger(valueP, radix); + if (bi.compareTo(BigInteger.valueOf(Integer.MAX_VALUE)) > 0) + throw new NumberFormatException("Value \"" + value + "\" is greater than the maximum Integer value " + Integer.MAX_VALUE + " !"); + if (bi.compareTo(BigInteger.valueOf(Integer.MIN_VALUE)) < 0) + throw new NumberFormatException("Value \"" + value + "\" is less than the minimum Integer value " + Integer.MIN_VALUE + " !"); + Float f = parseFloat(valueP); + if (f != null) { + return f.intValue(); + } else { + throw new NumberFormatException(e.getMessage()); + } } + } catch (NumberFormatException e) { + throw new NumberFormatException(e.getMessage()); } } return null; } public static Long parseLong(String value) { - if (value != null) { + int radix = getRadix(value); + return parseLong(value, radix); + } + + public static Long parseLong(String value, int radix) { + if (StringUtils.isNotBlank(value)) { try { - int radix = 10; - if (isHexadecimal(value)) { - radix = 16; + String valueP = prepareNumberString(value); + isValidRadix(valueP, radix); + try { + return Long.parseLong(valueP, radix); + } catch (NumberFormatException e) { + BigInteger bi = new BigInteger(valueP, radix); + if (bi.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) + throw new NumberFormatException("Value \"" + value + "\"is greater than the maximum Long value " + Long.MAX_VALUE + " !"); + if (bi.compareTo(BigInteger.valueOf(Long.MIN_VALUE)) < 0) + throw new NumberFormatException("Value \"" + value + "\" is less than the minimum Long value " + Long.MIN_VALUE + " !"); + Double dd = parseDouble(valueP); + if (dd != null) { + return dd.longValue(); + } else { + throw new NumberFormatException(e.getMessage()); + } } - return Long.parseLong(prepareNumberString(value), radix); } catch (NumberFormatException e) { - Double d = parseDouble(value); - if (d != null) { - return d.longValue(); - } + throw new NumberFormatException(e.getMessage()); } } return null; } - public static Long parseLong(String value, int radix) { - if (value != null) { - try { - return Long.parseLong(prepareNumberString(value), radix); - } catch (NumberFormatException e) { - Double d = parseDouble(value); - if (d != null) { - return d.longValue(); - } - } - } - return null; + private static int getRadix(String value, int... radixS) { + return radixS.length > 0 ? radixS[0] : isHexadecimal(value) ? 16 : 10; } public static Float parseFloat(String value) { @@ -622,4 +627,19 @@ public class TbUtils { } } } + + public static boolean isValidRadix(String value, int radix) { + for (int i = 0; i < value.length(); i++) { + if (i == 0 && value.charAt(i) == '-') { + if (value.length() == 1) + throw new NumberFormatException("Failed radix [" + radix + "] for value: \"" + value + "\"!"); + else + continue; + } + if (Character.digit(value.charAt(i), radix) < 0) + throw new NumberFormatException("Failed radix: [" + radix + "] for value: \"" + value + "\"!"); + } + return true; + } + } diff --git a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java index 82cd74ca30..35a8936e4b 100644 --- a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java +++ b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java @@ -26,6 +26,7 @@ import org.mvel2.SandboxedParserConfiguration; import org.mvel2.execution.ExecutionArrayList; import org.mvel2.execution.ExecutionHashMap; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Calendar; @@ -182,6 +183,98 @@ public class TbUtilsTest { Assert.assertEquals(expectedMapWithoutPaths, actualMapWithoutPaths); } + @Test + public void parseInt() { + Assert.assertNull(TbUtils.parseInt(null)); + Assert.assertNull(TbUtils.parseInt("")); + Assert.assertNull(TbUtils.parseInt(" ")); + + Assert.assertEquals(java.util.Optional.of(0).get(), TbUtils.parseInt("0")); + Assert.assertEquals(java.util.Optional.of(0).get(), TbUtils.parseInt("-0")); + Assert.assertEquals(java.util.Optional.of(473).get(), TbUtils.parseInt("473")); + Assert.assertEquals(java.util.Optional.of(-255).get(), TbUtils.parseInt("-0xFF")); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("FF")); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("0xFG")); + + Assert.assertEquals(java.util.Optional.of(102).get(), TbUtils.parseInt("1100110", 2)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("1100210", 2)); + + Assert.assertEquals(java.util.Optional.of(63).get(), TbUtils.parseInt("77", 8)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("18", 8)); + + Assert.assertEquals(java.util.Optional.of(-255).get(), TbUtils.parseInt("-FF", 16)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("FG", 16)); + + + Assert.assertEquals(java.util.Optional.of(Integer.MAX_VALUE).get(), TbUtils.parseInt(Integer.toString(Integer.MAX_VALUE), 10)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt(BigInteger.valueOf(Integer.MAX_VALUE).add(BigInteger.valueOf(1)).toString(10), 10)); + Assert.assertEquals(java.util.Optional.of(Integer.MIN_VALUE).get(), TbUtils.parseInt(Integer.toString(Integer.MIN_VALUE), 10)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt(BigInteger.valueOf(Integer.MIN_VALUE).subtract(BigInteger.valueOf(1)).toString(10), 10)); + + Assert.assertEquals(java.util.Optional.of(506070563).get(), TbUtils.parseInt("KonaIn", 30)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("KonaIn", 10)); + } + + @Test + public void parseLong() { + Assert.assertNull(TbUtils.parseLong(null)); + Assert.assertNull(TbUtils.parseLong("")); + Assert.assertNull(TbUtils.parseLong(" ")); + + Assert.assertEquals(java.util.Optional.of(0L).get(), TbUtils.parseLong("0")); + Assert.assertEquals(java.util.Optional.of(0L).get(), TbUtils.parseLong("-0")); + Assert.assertEquals(java.util.Optional.of(473L).get(), TbUtils.parseLong("473")); + Assert.assertEquals(java.util.Optional.of(-65535L).get(), TbUtils.parseLong("-0xFFFF")); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("FFFF")); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("0xFGFF")); + + Assert.assertEquals(java.util.Optional.of(13158L).get(), TbUtils.parseLong("11001101100110", 2)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("11001101100210", 2)); + + Assert.assertEquals(java.util.Optional.of(9223372036854775807L).get(), TbUtils.parseLong("777777777777777777777", 8)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("1787", 8)); + + Assert.assertEquals(java.util.Optional.of(-255L).get(), TbUtils.parseLong("-FF", 16)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("FG", 16)); + + + Assert.assertEquals(java.util.Optional.of(Long.MAX_VALUE).get(), TbUtils.parseLong(Long.toString(Long.MAX_VALUE), 10)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong(BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.valueOf(1)).toString(10), 10)); + Assert.assertEquals(java.util.Optional.of(Long.MIN_VALUE).get(), TbUtils.parseLong(Long.toString(Long.MIN_VALUE), 10)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong(BigInteger.valueOf(Long.MIN_VALUE).subtract(BigInteger.valueOf(1)).toString(10), 10)); + + Assert.assertEquals(java.util.Optional.of(218840926543L).get(), TbUtils.parseLong("KonaLong", 27)); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("KonaLong", 10)); + } + + + // parseLong String.class, int.class +// parseLittleEndianHexToLong String.class + // parseBigEndianHexToLong String.class + // parseHexToLong String.class + // parseHexToLong String.class boolean.class + // parseBytesToLong List.class, int.class, int.class + // parseBytesToLong List.class, int.class, int.class, boolean.class + // parseBytesToLong byte[].class, int.class, int.class + // parseBytesToLong byte[].class, int.class, int.class, boolean.class + // parseLittleEndianHexToFloat String.class + // parseBigEndianHexToFloat String.class + // parseHexToFloat String.class + // parseHexToFloat String.class boolean.class + // parseBytesToFloat byte[].class, int.class, boolean.class + // parseBytesToFloat byte[].class, int.class + // parseBytesToFloat List.class, int.class, boolean.class + // parseBytesToFloat List.class, int.class + // toFixed float.class, int.class + // parseLittleEndianHexToDouble String.class + // parseBigEndianHexToDouble String.class + // parseHexToDouble String.class + // parseHexToDouble String.class boolean.class + // parseBytesToDouble byte[].class, int.class + // parseBytesToDouble byte[].class, int.class, boolean.class + // parseBytesToDouble List.class, int.class + // parseBytesToDouble List.class, int.class boolean.class + private static String keyToValue(String key, String extraSymbol) { return key + "Value" + (extraSymbol == null ? "" : extraSymbol); From 6894ffb8a99b85fabd9c2a21298bfea11de43f95 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Mon, 19 Jun 2023 17:38:41 +0300 Subject: [PATCH 07/10] tbel: add Tests - long, float, double --- .../script/api/tbel/TbUtilsTest.java | 130 +++++++++++++----- 1 file changed, 99 insertions(+), 31 deletions(-) diff --git a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java index 35a8936e4b..e2f239f30e 100644 --- a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java +++ b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java @@ -15,6 +15,7 @@ */ package org.thingsboard.script.api.tbel; +import com.google.common.primitives.Bytes; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Assert; @@ -39,6 +40,23 @@ public class TbUtilsTest { private ExecutionContext ctx; + private final String intValHex = "41EA62CC"; + private final float floatVal = 29.29824f; + private final String floatValStr = "29.29824"; + + + private final String floatValHexRev = "CC62EA41"; + private final float floatValRev = -5.948442E7f; + + private final long longVal = 0x409B04B10CB295EAL; + private final String longValHex = "409B04B10CB295EA"; + private final long longValRev = 0xEA95B20CB1049B40L; + private final String longValHexRev = "EA95B20CB1049B40"; + private final String doubleValStr = "1729.1729"; + private final double doubleVal = 1729.1729; + private final double doubleValRev = -2.7208640774822924E205; + + @Before public void before() { SandboxedParserConfiguration parserConfig = ParserContext.enableSandboxedMode(); @@ -63,6 +81,7 @@ public class TbUtilsTest { @Test public void parseHexToInt() { Assert.assertEquals(0xAB, TbUtils.parseHexToInt("AB")); + Assert.assertEquals(0xABBA, TbUtils.parseHexToInt("ABBA", true)); Assert.assertEquals(0xBAAB, TbUtils.parseHexToInt("ABBA", false)); Assert.assertEquals(0xAABBCC, TbUtils.parseHexToInt("AABBCC", true)); @@ -215,6 +234,37 @@ public class TbUtilsTest { Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("KonaIn", 10)); } + @Test + public void parseFloat() { + Assert.assertEquals(java.util.Optional.of(floatVal).get(), TbUtils.parseFloat(floatValStr)); + } + + @Test + public void toFixedFloat() { + float actualF = TbUtils.toFixed(floatVal, 3); + Assert.assertEquals(1, Float.compare(floatVal, actualF)); + Assert.assertEquals(0, Float.compare(29.298f, actualF)); + } + + @Test + public void parseHexToFloat() { + Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseHexToFloat(intValHex))); + Assert.assertEquals(0, Float.compare(floatValRev, TbUtils.parseHexToFloat(intValHex, false))); + Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseBigEndianHexToFloat(intValHex))); + Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseLittleEndianHexToFloat(floatValHexRev))); + } + + @Test + public void arseBytesToFloat() { + byte[] floatValByte = {65, -22, 98, -52}; + Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseBytesToFloat(floatValByte, 0))); + Assert.assertEquals(0, Float.compare(floatValRev, TbUtils.parseBytesToFloat(floatValByte, 0, false))); + + List floatVaList = Bytes.asList(floatValByte); + Assert.assertEquals(0, Float.compare(floatVal, TbUtils.parseBytesToFloat(floatVaList, 0))); + Assert.assertEquals(0, Float.compare(floatValRev, TbUtils.parseBytesToFloat(floatVaList, 0, false))); + } + @Test public void parseLong() { Assert.assertNull(TbUtils.parseLong(null)); @@ -225,8 +275,8 @@ public class TbUtilsTest { Assert.assertEquals(java.util.Optional.of(0L).get(), TbUtils.parseLong("-0")); Assert.assertEquals(java.util.Optional.of(473L).get(), TbUtils.parseLong("473")); Assert.assertEquals(java.util.Optional.of(-65535L).get(), TbUtils.parseLong("-0xFFFF")); - Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("FFFF")); - Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseInt("0xFGFF")); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("FFFFFFFF")); + Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("0xFGFFFFFF")); Assert.assertEquals(java.util.Optional.of(13158L).get(), TbUtils.parseLong("11001101100110", 2)); Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("11001101100210", 2)); @@ -247,37 +297,55 @@ public class TbUtilsTest { Assert.assertThrows(NumberFormatException.class, () -> TbUtils.parseLong("KonaLong", 10)); } + @Test + public void parseHexToLong() { + Assert.assertEquals(longVal, TbUtils.parseHexToLong(longValHex)); + Assert.assertEquals(longVal, TbUtils.parseHexToLong(longValHexRev, false)); + Assert.assertEquals(longVal, TbUtils.parseBigEndianHexToLong(longValHex)); + Assert.assertEquals(longVal, TbUtils.parseLittleEndianHexToLong(longValHexRev)); + } - // parseLong String.class, int.class -// parseLittleEndianHexToLong String.class - // parseBigEndianHexToLong String.class - // parseHexToLong String.class - // parseHexToLong String.class boolean.class - // parseBytesToLong List.class, int.class, int.class - // parseBytesToLong List.class, int.class, int.class, boolean.class - // parseBytesToLong byte[].class, int.class, int.class - // parseBytesToLong byte[].class, int.class, int.class, boolean.class - // parseLittleEndianHexToFloat String.class - // parseBigEndianHexToFloat String.class - // parseHexToFloat String.class - // parseHexToFloat String.class boolean.class - // parseBytesToFloat byte[].class, int.class, boolean.class - // parseBytesToFloat byte[].class, int.class - // parseBytesToFloat List.class, int.class, boolean.class - // parseBytesToFloat List.class, int.class - // toFixed float.class, int.class - // parseLittleEndianHexToDouble String.class - // parseBigEndianHexToDouble String.class - // parseHexToDouble String.class - // parseHexToDouble String.class boolean.class - // parseBytesToDouble byte[].class, int.class - // parseBytesToDouble byte[].class, int.class, boolean.class - // parseBytesToDouble List.class, int.class - // parseBytesToDouble List.class, int.class boolean.class + @Test + public void parseBytesToLong() { + byte[] longValByte = {64, -101, 4, -79, 12, -78, -107, -22}; + Assert.assertEquals(longVal, TbUtils.parseBytesToLong(longValByte, 0, 8)); + Bytes.reverse(longValByte); + Assert.assertEquals(longVal, TbUtils.parseBytesToLong(longValByte, 0, 8, false)); + List longVaList = Bytes.asList(longValByte); + Assert.assertEquals(longVal, TbUtils.parseBytesToLong(longVaList, 0, 8, false)); + Assert.assertEquals(longValRev, TbUtils.parseBytesToLong(longVaList, 0, 8)); + } - private static String keyToValue(String key, String extraSymbol) { - return key + "Value" + (extraSymbol == null ? "" : extraSymbol); + @Test + public void parsDouble() { + Assert.assertEquals(java.util.Optional.of(doubleVal).get(), TbUtils.parseDouble(doubleValStr)); + } + + @Test + public void toFixedDouble() { + double actualD = TbUtils.toFixed(doubleVal, 3); + Assert.assertEquals(-1, Double.compare(doubleVal, actualD)); + Assert.assertEquals(0, Double.compare(1729.173, actualD)); + } + + @Test + public void parseHexToDouble() { + Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseHexToDouble(longValHex))); + Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseHexToDouble(longValHex, false))); + Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBigEndianHexToDouble(longValHex))); + Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseLittleEndianHexToDouble(longValHexRev))); + } + + @Test + public void arseBytesToDouble() { + byte[] doubleValByte = {64, -101, 4, -79, 12, -78, -107, -22}; + Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBytesToDouble(doubleValByte, 0))); + Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseBytesToDouble(doubleValByte, 0, false))); + + List doubleVaList = Bytes.asList(doubleValByte); + Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBytesToDouble(doubleVaList, 0))); + Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseBytesToDouble(doubleVaList, 0, false))); } private static List toList(byte[] data) { @@ -287,5 +355,5 @@ public class TbUtilsTest { } return result; } - } + From d9c028f566ec695f6d9b975bd5c86abd1564af9d Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Mon, 19 Jun 2023 17:41:28 +0300 Subject: [PATCH 08/10] tbel: add Tests - long, float, double (syntax) --- .../test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java index e2f239f30e..b6d5395af8 100644 --- a/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java +++ b/common/script/script-api/src/test/java/org/thingsboard/script/api/tbel/TbUtilsTest.java @@ -338,7 +338,7 @@ public class TbUtilsTest { } @Test - public void arseBytesToDouble() { + public void parseBytesToDouble() { byte[] doubleValByte = {64, -101, 4, -79, 12, -78, -107, -22}; Assert.assertEquals(0, Double.compare(doubleVal, TbUtils.parseBytesToDouble(doubleValByte, 0))); Assert.assertEquals(0, Double.compare(doubleValRev, TbUtils.parseBytesToDouble(doubleValByte, 0, false))); From 649898566c4b5c652bd1d008bc4abcb9ef97744c Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 21 Jun 2023 19:05:42 +0300 Subject: [PATCH 09/10] Add unrecoverable initialization error for actors. Fix timeouts on missing rule chain id in the rule chain input node --- .../actors/ruleChain/DefaultTbContext.java | 5 +- .../ruleChain/RuleChainManagerActor.java | 9 ++- .../actors/shared/RuleChainErrorActor.java | 78 +++++++++++++++++++ .../server/actors/TbActorMailbox.java | 17 +++- .../server/common/msg/TbActorError.java | 22 ++++++ .../common/msg/aware/RuleChainAwareMsg.java | 3 + .../rule/engine/api/TbContext.java | 2 +- .../rule/engine/api/TbNodeException.java | 18 ++++- .../rule/engine/api/util/TbNodeUtils.java | 2 +- .../rule/engine/action/TbCreateAlarmNode.java | 2 +- 10 files changed, 149 insertions(+), 9 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 59d633540e..d32867c626 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -35,6 +35,7 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.slack.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; @@ -843,9 +844,9 @@ class DefaultTbContext implements TbContext { } @Override - public void checkTenantEntity(EntityId entityId) { + public void checkTenantEntity(EntityId entityId) throws TbNodeException { if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) { - throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant."); + throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java index 9534104ec1..7f919754fc 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java @@ -23,6 +23,7 @@ import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.DefaultActorService; +import org.thingsboard.server.actors.shared.RuleChainErrorActor; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; @@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.dao.rule.RuleChainService; import java.util.function.Function; @@ -86,7 +88,12 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { () -> DefaultActorService.RULE_DISPATCHER_NAME, () -> { RuleChain ruleChain = provider.apply(ruleChainId); - return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); + if (ruleChain == null) { + return new RuleChainErrorActor.ActorCreator(systemContext, tenantId, + new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!")); + } else { + return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); + } }); } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java b/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java new file mode 100644 index 0000000000..a204c7b286 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/shared/RuleChainErrorActor.java @@ -0,0 +1,78 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.shared; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbStringActorId; +import org.thingsboard.server.actors.service.ContextAwareActor; +import org.thingsboard.server.actors.service.ContextBasedCreator; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; +import org.thingsboard.server.common.msg.queue.RuleEngineException; + +import java.util.UUID; + +@Slf4j +public class RuleChainErrorActor extends ContextAwareActor { + + private final TenantId tenantId; + private final RuleEngineException error; + + private RuleChainErrorActor(ActorSystemContext systemContext, TenantId tenantId, RuleEngineException error) { + super(systemContext); + this.tenantId = tenantId; + this.error = error; + } + + @Override + protected boolean doProcess(TbActorMsg msg) { + if (msg instanceof RuleChainAwareMsg) { + log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg); + var rcMsg = (RuleChainAwareMsg) msg; + rcMsg.getMsg().getCallback().onFailure(error); + return true; + } else { + return false; + } + } + + public static class ActorCreator extends ContextBasedCreator { + + private final TenantId tenantId; + private final RuleEngineException error; + + public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) { + super(context); + this.tenantId = tenantId; + this.error = error; + } + + @Override + public TbActorId createActorId() { + return new TbStringActorId(UUID.randomUUID().toString()); + } + + @Override + public TbActor createActor() { + return new RuleChainErrorActor(context, tenantId, error); + } + } + +} diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index 587da8f3b1..ad1604f7b0 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -19,6 +19,7 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorError; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbActorStopReason; @@ -69,9 +70,14 @@ public final class TbActorMailbox implements TbActorCtx { } } } catch (Throwable t) { - log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); + InitFailureStrategy strategy; int attemptIdx = attempt + 1; - InitFailureStrategy strategy = actor.onInitFailure(attempt, t); + if (isUnrecoverable(t)) { + strategy = InitFailureStrategy.stop(); + } else { + log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t); + strategy = actor.onInitFailure(attempt, t); + } if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) { log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t); stopReason = TbActorStopReason.INIT_FAILED; @@ -88,6 +94,13 @@ public final class TbActorMailbox implements TbActorCtx { } } + private static boolean isUnrecoverable(Throwable t) { + if (t instanceof TbActorException && t.getCause() != null) { + t = t.getCause(); + } + return t instanceof TbActorError && ((TbActorError) t).isUnrecoverable(); + } + private void enqueue(TbActorMsg msg, boolean highPriority) { if (!destroyInProgress.get()) { if (highPriority) { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java new file mode 100644 index 0000000000..fc27feb096 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbActorError.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg; + +public interface TbActorError { + + boolean isUnrecoverable(); + +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java index d0e90ae421..5fbc857e0c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java @@ -17,9 +17,12 @@ package org.thingsboard.server.common.msg.aware; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.TbMsg; public interface RuleChainAwareMsg extends TbActorMsg { RuleChainId getRuleChainId(); + + TbMsg getMsg(); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 1561dac05e..b47264c280 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -210,7 +210,7 @@ public interface TbContext { void schedule(Runnable runnable, long delay, TimeUnit timeUnit); - void checkTenantEntity(EntityId entityId); + void checkTenantEntity(EntityId entityId) throws TbNodeException; boolean isLocalEntity(EntityId entityId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java index 32341dbf2b..a8ce9d7e42 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbNodeException.java @@ -15,17 +15,33 @@ */ package org.thingsboard.rule.engine.api; +import lombok.Getter; +import org.thingsboard.server.common.msg.TbActorError; + /** * Created by ashvayka on 19.01.18. */ -public class TbNodeException extends Exception { +public class TbNodeException extends Exception implements TbActorError { + + @Getter + private final boolean unrecoverable; public TbNodeException(String message) { + this(message, false); + } + + public TbNodeException(String message, boolean unrecoverable) { super(message); + this.unrecoverable = unrecoverable; } public TbNodeException(Exception e) { + this(e, false); + } + + public TbNodeException(Exception e, boolean unrecoverable) { super(e); + this.unrecoverable = unrecoverable; } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java index 1ca1d2516e..73cac87832 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/util/TbNodeUtils.java @@ -45,7 +45,7 @@ public class TbNodeUtils { try { return mapper.treeToValue(configuration.getData(), clazz); } catch (JsonProcessingException e) { - throw new TbNodeException(e); + throw new TbNodeException(e, true); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java index de68324c98..5297269c45 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java @@ -67,7 +67,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode Date: Thu, 22 Jun 2023 15:59:36 +0300 Subject: [PATCH 10/10] Ability to force ack for external nodes --- .../server/actors/ActorSystemContext.java | 4 ++ .../actors/ruleChain/DefaultTbContext.java | 36 ++++++++--- .../src/main/resources/thingsboard.yml | 4 ++ .../thingsboard/server/common/msg/TbMsg.java | 5 ++ .../rule/engine/api/TbContext.java | 4 ++ .../rule/engine/aws/sns/TbSnsNode.java | 11 ++-- .../rule/engine/aws/sqs/TbSqsNode.java | 9 ++- .../external/TbAbstractExternalNode.java | 61 +++++++++++++++++++ .../rule/engine/gcp/pubsub/TbPubSubNode.java | 11 ++-- .../rule/engine/kafka/TbKafkaNode.java | 11 ++-- .../rule/engine/mail/TbSendEmailNode.java | 11 ++-- .../rule/engine/mqtt/TbMqttNode.java | 12 ++-- .../engine/mqtt/azure/TbAzureIotHubNode.java | 3 +- .../notification/TbNotificationNode.java | 23 ++++--- .../rule/engine/notification/TbSlackNode.java | 9 ++- .../rule/engine/rabbitmq/TbRabbitMqNode.java | 12 ++-- .../rule/engine/rest/TbHttpClient.java | 24 ++++---- .../rule/engine/rest/TbRestApiCallNode.java | 13 ++-- .../rule/engine/sms/TbSendSmsNode.java | 9 ++- .../rule/engine/rest/TbHttpClientTest.java | 4 +- 20 files changed, 200 insertions(+), 76 deletions(-) create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 515c635e68..7e5dfed5f0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -538,6 +538,10 @@ public class ActorSystemContext { @Getter private int maxRpcRetries; + @Value("${actors.rule.external.force_ack:false}") + @Getter + private boolean externalNodeForceAck; + @Getter @Setter private TbActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index d32867c626..ca83852fe4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -218,6 +218,12 @@ class DefaultTbContext implements TbContext { enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); } + @Override + public void enqueueForTellFailure(TbMsg tbMsg, Throwable th) { + TopicPartitionInfo tpi = resolvePartition(tbMsg); + enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), getFailureMessage(th), null, null); + } + @Override public void enqueueForTellNext(TbMsg tbMsg, String relationType) { TopicPartitionInfo tpi = resolvePartition(tbMsg); @@ -315,16 +321,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th); } - String failureMessage; - if (th != null) { - if (!StringUtils.isEmpty(th.getMessage())) { - failureMessage = th.getMessage(); - } else { - failureMessage = th.getClass().getSimpleName(); - } - } else { - failureMessage = null; - } + String failureMessage = getFailureMessage(th); nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), msg, failureMessage)); @@ -728,6 +725,11 @@ class DefaultTbContext implements TbContext { return mainCtx.getSlackService(); } + @Override + public boolean isExternalNodeForceAck() { + return mainCtx.isExternalNodeForceAck(); + } + @Override public RuleEngineRpcService getRpcService() { return mainCtx.getTbRuleEngineDeviceRpcService(); @@ -850,6 +852,20 @@ class DefaultTbContext implements TbContext { } } + private static String getFailureMessage(Throwable th) { + String failureMessage; + if (th != null) { + if (!StringUtils.isEmpty(th.getMessage())) { + failureMessage = th.getMessage(); + } else { + failureMessage = th.getClass().getSimpleName(); + } + } else { + failureMessage = null; + } + return failureMessage; + } + private class SimpleTbQueueCallback implements TbQueueCallback { private final Runnable onSuccess; private final Consumer onFailure; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 96e409373e..589540096c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -389,6 +389,10 @@ actors: queue_size: "${ACTORS_RULE_TRANSACTION_QUEUE_SIZE:15000}" # Time in milliseconds for transaction to complete duration: "${ACTORS_RULE_TRANSACTION_DURATION:60000}" + external: + # Force acknowledgement of the incoming message for external rule nodes to decrease processing latency. + # Enqueue the result of external node processing as a separate message to the rule engine. + force_ack: "${ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK:false}" rpc: max_retries: "${ACTORS_RPC_MAX_RETRIES:5}" sequential: "${ACTORS_RPC_SEQUENTIAL:false}" diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index f04104bc51..17bc7c0a8f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -277,6 +277,11 @@ public final class TbMsg implements Serializable { this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback); } + public TbMsg copyWithNewCtx() { + return new TbMsg(this.queueName, this.id, this.ts, this.type, this.originator, this.customerId, + this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx.copy(), TbMsgCallback.EMPTY); + } + public TbMsgCallback getCallback() { // May be null in case of deserialization; if (callback != null) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index b47264c280..88bf80e9f9 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -167,6 +167,8 @@ public interface TbContext { void enqueueForTellFailure(TbMsg msg, String failureMessage); + void enqueueForTellFailure(TbMsg tbMsg, Throwable t); + void enqueueForTellNext(TbMsg msg, String relationType); void enqueueForTellNext(TbMsg msg, Set relationTypes); @@ -302,6 +304,8 @@ public interface TbContext { SlackService getSlackService(); + boolean isExternalNodeForceAck(); + /** * Creates JS Script Engine * @deprecated diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java index f02434d4ee..bad37922e7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sns/TbSnsNode.java @@ -26,10 +26,11 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -51,7 +52,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSnsConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCIgd2lkdGg9IjQ4IiBoZWlnaHQ9IjQ4Ij48cGF0aCBkPSJNMTMuMjMgMTAuNTZWMTBjLTEuOTQgMC0zLjk5LjM5LTMuOTkgMi42NyAwIDEuMTYuNjEgMS45NSAxLjYzIDEuOTUuNzYgMCAxLjQzLS40NyAxLjg2LTEuMjIuNTItLjkzLjUtMS44LjUtMi44NG0yLjcgNi41M2MtLjE4LjE2LS40My4xNy0uNjMuMDYtLjg5LS43NC0xLjA1LTEuMDgtMS41NC0xLjc5LTEuNDcgMS41LTIuNTEgMS45NS00LjQyIDEuOTUtMi4yNSAwLTQuMDEtMS4zOS00LjAxLTQuMTcgMC0yLjE4IDEuMTctMy42NCAyLjg2LTQuMzggMS40Ni0uNjQgMy40OS0uNzYgNS4wNC0uOTNWNy41YzAtLjY2LjA1LTEuNDEtLjMzLTEuOTYtLjMyLS40OS0uOTUtLjctMS41LS43LTEuMDIgMC0xLjkzLjUzLTIuMTUgMS42MS0uMDUuMjQtLjI1LjQ4LS40Ny40OWwtMi42LS4yOGMtLjIyLS4wNS0uNDYtLjIyLS40LS41Ni42LTMuMTUgMy40NS00LjEgNi00LjEgMS4zIDAgMyAuMzUgNC4wMyAxLjMzQzE3LjExIDQuNTUgMTcgNi4xOCAxNyA3Ljk1djQuMTdjMCAxLjI1LjUgMS44MSAxIDIuNDguMTcuMjUuMjEuNTQgMCAuNzFsLTIuMDYgMS43OGgtLjAxIj48L3BhdGg+PHBhdGggZD0iTTIwLjE2IDE5LjU0QzE4IDIxLjE0IDE0LjgyIDIyIDEyLjEgMjJjLTMuODEgMC03LjI1LTEuNDEtOS44NS0zLjc2LS4yLS4xOC0uMDItLjQzLjI1LS4yOSAyLjc4IDEuNjMgNi4yNSAyLjYxIDkuODMgMi42MSAyLjQxIDAgNS4wNy0uNSA3LjUxLTEuNTMuMzctLjE2LjY2LjI0LjMyLjUxIj48L3BhdGg+PHBhdGggZD0iTTIxLjA3IDE4LjVjLS4yOC0uMzYtMS44NS0uMTctMi41Ny0uMDgtLjE5LjAyLS4yMi0uMTYtLjAzLS4zIDEuMjQtLjg4IDMuMjktLjYyIDMuNTMtLjMzLjI0LjMtLjA3IDIuMzUtMS4yNCAzLjMyLS4xOC4xNi0uMzUuMDctLjI2LS4xMS4yNi0uNjcuODUtMi4xNC41Ny0yLjV6Ij48L3BhdGg+PC9zdmc+" ) -public class TbSnsNode implements TbNode { +public class TbSnsNode extends TbAbstractExternalNode { private static final String MESSAGE_ID = "messageId"; private static final String REQUEST_ID = "requestId"; @@ -62,6 +63,7 @@ public class TbSnsNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbSnsNodeConfiguration.class); AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey()); AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); @@ -78,8 +80,9 @@ public class TbSnsNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { withCallback(publishMessageAsync(ctx, msg), - ctx::tellSuccess, - t -> ctx.tellFailure(processException(ctx, msg, t), t)); + m -> tellSuccess(ctx, m), + t -> tellFailure(ctx, processException(ctx, msg, t), t)); + ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java index 7386d30c7c..f8ebc8e295 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/aws/sqs/TbSqsNode.java @@ -31,6 +31,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -55,7 +56,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSqsConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCIgd2lkdGg9IjQ4IiBoZWlnaHQ9IjQ4Ij48cGF0aCBkPSJNMTMuMjMgMTAuNTZWMTBjLTEuOTQgMC0zLjk5LjM5LTMuOTkgMi42NyAwIDEuMTYuNjEgMS45NSAxLjYzIDEuOTUuNzYgMCAxLjQzLS40NyAxLjg2LTEuMjIuNTItLjkzLjUtMS44LjUtMi44NG0yLjcgNi41M2MtLjE4LjE2LS40My4xNy0uNjMuMDYtLjg5LS43NC0xLjA1LTEuMDgtMS41NC0xLjc5LTEuNDcgMS41LTIuNTEgMS45NS00LjQyIDEuOTUtMi4yNSAwLTQuMDEtMS4zOS00LjAxLTQuMTcgMC0yLjE4IDEuMTctMy42NCAyLjg2LTQuMzggMS40Ni0uNjQgMy40OS0uNzYgNS4wNC0uOTNWNy41YzAtLjY2LjA1LTEuNDEtLjMzLTEuOTYtLjMyLS40OS0uOTUtLjctMS41LS43LTEuMDIgMC0xLjkzLjUzLTIuMTUgMS42MS0uMDUuMjQtLjI1LjQ4LS40Ny40OWwtMi42LS4yOGMtLjIyLS4wNS0uNDYtLjIyLS40LS41Ni42LTMuMTUgMy40NS00LjEgNi00LjEgMS4zIDAgMyAuMzUgNC4wMyAxLjMzQzE3LjExIDQuNTUgMTcgNi4xOCAxNyA3Ljk1djQuMTdjMCAxLjI1LjUgMS44MSAxIDIuNDguMTcuMjUuMjEuNTQgMCAuNzFsLTIuMDYgMS43OGgtLjAxIj48L3BhdGg+PHBhdGggZD0iTTIwLjE2IDE5LjU0QzE4IDIxLjE0IDE0LjgyIDIyIDEyLjEgMjJjLTMuODEgMC03LjI1LTEuNDEtOS44NS0zLjc2LS4yLS4xOC0uMDItLjQzLjI1LS4yOSAyLjc4IDEuNjMgNi4yNSAyLjYxIDkuODMgMi42MSAyLjQxIDAgNS4wNy0uNSA3LjUxLTEuNTMuMzctLjE2LjY2LjI0LjMyLjUxIj48L3BhdGg+PHBhdGggZD0iTTIxLjA3IDE4LjVjLS4yOC0uMzYtMS44NS0uMTctMi41Ny0uMDgtLjE5LjAyLS4yMi0uMTYtLjAzLS4zIDEuMjQtLjg4IDMuMjktLjYyIDMuNTMtLjMzLjI0LjMtLjA3IDIuMzUtMS4yNCAzLjMyLS4xOC4xNi0uMzUuMDctLjI2LS4xMS4yNi0uNjcuODUtMi4xNC41Ny0yLjV6Ij48L3BhdGg+PC9zdmc+" ) -public class TbSqsNode implements TbNode { +public class TbSqsNode extends TbAbstractExternalNode { private static final String MESSAGE_ID = "messageId"; private static final String REQUEST_ID = "requestId"; @@ -69,6 +70,7 @@ public class TbSqsNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbSqsNodeConfiguration.class); AWSCredentials awsCredentials = new BasicAWSCredentials(this.config.getAccessKeyId(), this.config.getSecretAccessKey()); AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); @@ -85,8 +87,9 @@ public class TbSqsNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { withCallback(publishMessageAsync(ctx, msg), - ctx::tellSuccess, - t -> ctx.tellFailure(processException(ctx, msg, t), t)); + m -> tellSuccess(ctx, m), + t -> tellFailure(ctx, processException(ctx, msg, t), t)); + ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java new file mode 100644 index 0000000000..f5f8c34ab3 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/external/TbAbstractExternalNode.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.external; + +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbRelationTypes; +import org.thingsboard.server.common.msg.TbMsg; + +public abstract class TbAbstractExternalNode implements TbNode { + + private boolean forceAck; + + public void init(TbContext ctx) { + this.forceAck = ctx.isExternalNodeForceAck(); + } + + protected void tellSuccess(TbContext ctx, TbMsg tbMsg) { + if (forceAck) { + ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.SUCCESS); + } else { + ctx.tellSuccess(tbMsg); + } + } + + protected void tellFailure(TbContext ctx, TbMsg tbMsg, Throwable t) { + if (forceAck) { + if (t == null) { + ctx.enqueueForTellNext(tbMsg.copyWithNewCtx(), TbRelationTypes.FAILURE); + } else { + ctx.enqueueForTellFailure(tbMsg.copyWithNewCtx(), t); + } + } else { + if (t == null) { + ctx.tellNext(tbMsg, TbRelationTypes.FAILURE); + } else { + ctx.tellFailure(tbMsg, t); + } + } + } + + protected void ackIfNeeded(TbContext ctx, TbMsg msg) { + if (forceAck) { + ctx.ack(msg); + } + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java index b0198e210c..3b927f05a6 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java @@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -53,7 +54,7 @@ import java.util.concurrent.TimeUnit; configDirective = "tbExternalNodePubSubConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyBpZD0iTGF5ZXJfMSIgZGF0YS1uYW1lPSJMYXllciAxIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSIxMjgiIGhlaWdodD0iMTI4IiB2aWV3Qm94PSIwIDAgMTI4IDEyOCI+Cjx0aXRsZT5DbG91ZCBQdWJTdWI8L3RpdGxlPgo8Zz4KPHBhdGggZD0iTTEyNi40Nyw1OC4xMmwtMjYuMy00NS43NEExMS41NiwxMS41NiwwLDAsMCw5MC4zMSw2LjVIMzcuN2ExMS41NSwxMS41NSwwLDAsMC05Ljg2LDUuODhMMS41Myw1OGExMS40OCwxMS40OCwwLDAsMCwwLDExLjQ0bDI2LjMsNDZhMTEuNzcsMTEuNzcsMCwwLDAsOS44Niw2LjA5SDkwLjNhMTEuNzMsMTEuNzMsMCwwLDAsOS44Ny02LjA2bDI2LjMtNDUuNzRBMTEuNzMsMTEuNzMsMCwwLDAsMTI2LjQ3LDU4LjEyWiIgc3R5bGU9ImZpbGw6ICM3MzViMmYiLz4KPHBhdGggZD0iTTg5LjIyLDQ3Ljc0LDgzLjM2LDQ5bC0xNC42LTE0LjZMNjQuMDksNDMuMSw2MS41NSw1My4ybDQuMjksNC4yOUw1Ny42LDU5LjE4LDQ2LjMsNDcuODhsLTcuNjcsNy4zOEw1Mi43Niw2OS4zN2wtMTUsMTEuOUw3OCwxMjEuNUg5MC4zYTExLjczLDExLjczLDAsMCwwLDkuODctNi4wNmwyMC43Mi0zNloiIHN0eWxlPSJvcGFjaXR5OiAwLjA3MDAwMDAwMDI5ODAyMztpc29sYXRpb246IGlzb2xhdGUiLz4KPHBhdGggZD0iTTgyLjg2LDQ3YTUuMzIsNS4zMiwwLDEsMS0xLjk1LDcuMjdBNS4zMiw1LjMyLDAsMCwxLDgyLjg2LDQ3IiBzdHlsZT0iZmlsbDogI2ZmZiIvPgo8cGF0aCBkPSJNMzkuODIsNTYuMThhNS4zMiw1LjMyLDAsMSwxLDcuMjctMS45NSw1LjMyLDUuMzIsMCwwLDEtNy4yNywxLjk1IiBzdHlsZT0iZmlsbDogI2ZmZiIvPgo8cGF0aCBkPSJNNjkuMzIsODguODVBNS4zMiw1LjMyLDAsMSwxLDY0LDgzLjUyYTUuMzIsNS4zMiwwLDAsMSw1LjMyLDUuMzIiIHN0eWxlPSJmaWxsOiAjZmZmIi8+CjxnPgo8cGF0aCBkPSJNNjQsNTIuOTRhMTEuMDYsMTEuMDYsMCwwLDEsMi40Ni4yOFYzOS4xNUg2MS41NFY1My4yMkExMS4wNiwxMS4wNiwwLDAsMSw2NCw1Mi45NFoiIHN0eWxlPSJmaWxsOiAjZmZmIi8+CjxwYXRoIGQ9Ik03NC41Nyw2Ny4yNmExMSwxMSwwLDAsMS0yLjQ3LDQuMjVsMTIuMTksNywyLjQ2LTQuMjZaIiBzdHlsZT0iZmlsbDogI2ZmZiIvPgo8cGF0aCBkPSJNNTMuNDMsNjcuMjZsLTEyLjE4LDcsMi40Niw0LjI2LDEyLjE5LTdBMTEsMTEsMCwwLDEsNTMuNDMsNjcuMjZaIiBzdHlsZT0iZmlsbDogI2ZmZiIvPgo8L2c+CjxwYXRoIGQ9Ik03Mi42LDY0QTguNiw4LjYsMCwxLDEsNjQsNTUuNCw4LjYsOC42LDAsMCwxLDcyLjYsNjQiIHN0eWxlPSJmaWxsOiAjZmZmIi8+CjxwYXRoIGQ9Ik0zOS4xLDcwLjU3YTYuNzYsNi43NiwwLDEsMS0yLjQ3LDkuMjMsNi43Niw2Ljc2LDAsMCwxLDIuNDctOS4yMyIgc3R5bGU9ImZpbGw6ICNmZmYiLz4KPHBhdGggZD0iTTgyLjE0LDgyLjI3YTYuNzYsNi43NiwwLDEsMSw5LjIzLTIuNDcsNi43NSw2Ljc1LDAsMCwxLTkuMjMsMi40NyIgc3R5bGU9ImZpbGw6ICNmZmYiLz4KPHBhdGggZD0iTTcwLjc2LDM5LjE1QTYuNzYsNi43NiwwLDEsMSw2NCwzMi4zOWE2Ljc2LDYuNzYsMCwwLDEsNi43Niw2Ljc2IiBzdHlsZT0iZmlsbDogI2ZmZiIvPgo8L2c+Cjwvc3ZnPgo=" ) -public class TbPubSubNode implements TbNode { +public class TbPubSubNode extends TbAbstractExternalNode { private static final String MESSAGE_ID = "messageId"; private static final String ERROR = "error"; @@ -63,8 +64,9 @@ public class TbPubSubNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class); try { - this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class); this.pubSubClient = initPubSubClient(); } catch (Exception e) { throw new TbNodeException(e); @@ -74,6 +76,7 @@ public class TbPubSubNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { publishMessage(ctx, msg); + ackIfNeeded(ctx, msg); } @Override @@ -101,12 +104,12 @@ public class TbPubSubNode implements TbNode { ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { public void onSuccess(String messageId) { TbMsg next = processPublishResult(ctx, msg, messageId); - ctx.tellSuccess(next); + tellSuccess(ctx, next); } public void onFailure(Throwable t) { TbMsg next = processException(ctx, msg, t); - ctx.tellFailure(next, t); + tellFailure(ctx, next, t); } }, ctx.getExternalCallExecutor()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java index de1abea224..ea8e3edb9d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java @@ -33,6 +33,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -56,7 +57,7 @@ import java.util.Properties; configDirective = "tbExternalNodeKafkaConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTUzOCIgaGVpZ2h0PSIyNTAwIiB2aWV3Qm94PSIwIDAgMjU2IDQxNiIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiBwcmVzZXJ2ZUFzcGVjdFJhdGlvPSJ4TWlkWU1pZCI+PHBhdGggZD0iTTIwMS44MTYgMjMwLjIxNmMtMTYuMTg2IDAtMzAuNjk3IDcuMTcxLTQwLjYzNCAxOC40NjFsLTI1LjQ2My0xOC4wMjZjMi43MDMtNy40NDIgNC4yNTUtMTUuNDMzIDQuMjU1LTIzLjc5NyAwLTguMjE5LTEuNDk4LTE2LjA3Ni00LjExMi0yMy40MDhsMjUuNDA2LTE3LjgzNWM5LjkzNiAxMS4yMzMgMjQuNDA5IDE4LjM2NSA0MC41NDggMTguMzY1IDI5Ljg3NSAwIDU0LjE4NC0yNC4zMDUgNTQuMTg0LTU0LjE4NCAwLTI5Ljg3OS0yNC4zMDktNTQuMTg0LTU0LjE4NC01NC4xODQtMjkuODc1IDAtNTQuMTg0IDI0LjMwNS01NC4xODQgNTQuMTg0IDAgNS4zNDguODA4IDEwLjUwNSAyLjI1OCAxNS4zODlsLTI1LjQyMyAxNy44NDRjLTEwLjYyLTEzLjE3NS0yNS45MTEtMjIuMzc0LTQzLjMzMy0yNS4xODJ2LTMwLjY0YzI0LjU0NC01LjE1NSA0My4wMzctMjYuOTYyIDQzLjAzNy01My4wMTlDMTI0LjE3MSAyNC4zMDUgOTkuODYyIDAgNjkuOTg3IDAgNDAuMTEyIDAgMTUuODAzIDI0LjMwNSAxNS44MDMgNTQuMTg0YzAgMjUuNzA4IDE4LjAxNCA0Ny4yNDYgNDIuMDY3IDUyLjc2OXYzMS4wMzhDMjUuMDQ0IDE0My43NTMgMCAxNzIuNDAxIDAgMjA2Ljg1NGMwIDM0LjYyMSAyNS4yOTIgNjMuMzc0IDU4LjM1NSA2OC45NHYzMi43NzRjLTI0LjI5OSA1LjM0MS00Mi41NTIgMjcuMDExLTQyLjU1MiA1Mi44OTQgMCAyOS44NzkgMjQuMzA5IDU0LjE4NCA1NC4xODQgNTQuMTg0IDI5Ljg3NSAwIDU0LjE4NC0yNC4zMDUgNTQuMTg0LTU0LjE4NCAwLTI1Ljg4My0xOC4yNTMtNDcuNTUzLTQyLjU1Mi01Mi44OTR2LTMyLjc3NWE2OS45NjUgNjkuOTY1IDAgMCAwIDQyLjYtMjQuNzc2bDI1LjYzMyAxOC4xNDNjLTEuNDIzIDQuODQtMi4yMiA5Ljk0Ni0yLjIyIDE1LjI0IDAgMjkuODc5IDI0LjMwOSA1NC4xODQgNTQuMTg0IDU0LjE4NCAyOS44NzUgMCA1NC4xODQtMjQuMzA1IDU0LjE4NC01NC4xODQgMC0yOS44NzktMjQuMzA5LTU0LjE4NC01NC4xODQtNTQuMTg0em0wLTEyNi42OTVjMTQuNDg3IDAgMjYuMjcgMTEuNzg4IDI2LjI3IDI2LjI3MXMtMTEuNzgzIDI2LjI3LTI2LjI3IDI2LjI3LTI2LjI3LTExLjc4Ny0yNi4yNy0yNi4yN2MwLTE0LjQ4MyAxMS43ODMtMjYuMjcxIDI2LjI3LTI2LjI3MXptLTE1OC4xLTQ5LjMzN2MwLTE0LjQ4MyAxMS43ODQtMjYuMjcgMjYuMjcxLTI2LjI3czI2LjI3IDExLjc4NyAyNi4yNyAyNi4yN2MwIDE0LjQ4My0xMS43ODMgMjYuMjctMjYuMjcgMjYuMjdzLTI2LjI3MS0xMS43ODctMjYuMjcxLTI2LjI3em01Mi41NDEgMzA3LjI3OGMwIDE0LjQ4My0xMS43ODMgMjYuMjctMjYuMjcgMjYuMjdzLTI2LjI3MS0xMS43ODctMjYuMjcxLTI2LjI3YzAtMTQuNDgzIDExLjc4NC0yNi4yNyAyNi4yNzEtMjYuMjdzMjYuMjcgMTEuNzg3IDI2LjI3IDI2LjI3em0tMjYuMjcyLTExNy45N2MtMjAuMjA1IDAtMzYuNjQyLTE2LjQzNC0zNi42NDItMzYuNjM4IDAtMjAuMjA1IDE2LjQzNy0zNi42NDIgMzYuNjQyLTM2LjY0MiAyMC4yMDQgMCAzNi42NDEgMTYuNDM3IDM2LjY0MSAzNi42NDIgMCAyMC4yMDQtMTYuNDM3IDM2LjYzOC0zNi42NDEgMzYuNjM4em0xMzEuODMxIDY3LjE3OWMtMTQuNDg3IDAtMjYuMjctMTEuNzg4LTI2LjI3LTI2LjI3MXMxMS43ODMtMjYuMjcgMjYuMjctMjYuMjcgMjYuMjcgMTEuNzg3IDI2LjI3IDI2LjI3YzAgMTQuNDgzLTExLjc4MyAyNi4yNzEtMjYuMjcgMjYuMjcxeiIvPjwvc3ZnPg==" ) -public class TbKafkaNode implements TbNode { +public class TbKafkaNode extends TbAbstractExternalNode { private static final String OFFSET = "offset"; private static final String PARTITION = "partition"; @@ -78,6 +79,7 @@ public class TbKafkaNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class); this.initError = null; Properties properties = new Properties(); @@ -129,6 +131,7 @@ public class TbKafkaNode implements TbNode { return null; }); } + ackIfNeeded(ctx, msg); } catch (Exception e) { ctx.tellFailure(msg, e); } @@ -164,11 +167,9 @@ public class TbKafkaNode implements TbNode { private void processRecord(TbContext ctx, TbMsg msg, RecordMetadata metadata, Exception e) { if (e == null) { - TbMsg next = processResponse(ctx, msg, metadata); - ctx.tellNext(next, TbRelationTypes.SUCCESS); + tellSuccess(ctx, processResponse(ctx, msg, metadata)); } else { - TbMsg next = processException(ctx, msg, e); - ctx.tellFailure(next, e); + tellFailure(ctx, processException(ctx, msg, e), e); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java index 3224465d68..3afea4f817 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -47,7 +48,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSendEmailConfig", icon = "send" ) -public class TbSendEmailNode implements TbNode { +public class TbSendEmailNode extends TbAbstractExternalNode { private static final String MAIL_PROP = "mail."; static final String SEND_EMAIL_TYPE = "SEND_EMAIL"; @@ -58,8 +59,9 @@ public class TbSendEmailNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class); try { - this.config = TbNodeUtils.convert(configuration, TbSendEmailNodeConfiguration.class); if (!this.config.isUseSystemSmtpSettings()) { mailSender = createMailSender(); } @@ -77,8 +79,9 @@ public class TbSendEmailNode implements TbNode { sendEmail(ctx, msg, email); return null; }), - ok -> ctx.tellSuccess(msg), - fail -> ctx.tellFailure(msg, fail)); + ok -> tellSuccess(ctx, msg), + fail -> tellFailure(ctx, msg, fail)); + ackIfNeeded(ctx, msg); } catch (Exception ex) { ctx.tellFailure(msg, ex); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java index 478e733958..8fac7b1683 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/TbMqttNode.java @@ -32,6 +32,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.credentials.BasicCredentials; import org.thingsboard.rule.engine.credentials.ClientCredentials; import org.thingsboard.rule.engine.credentials.CredentialsType; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentType; @@ -55,7 +56,7 @@ import java.util.concurrent.TimeoutException; configDirective = "tbExternalNodeMqttConfig", icon = "call_split" ) -public class TbMqttNode implements TbNode { +public class TbMqttNode extends TbAbstractExternalNode { private static final Charset UTF8 = Charset.forName("UTF-8"); @@ -67,8 +68,9 @@ public class TbMqttNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); try { - this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); this.mqttClient = initClient(ctx); } catch (Exception e) { throw new TbNodeException(e); @@ -81,13 +83,13 @@ public class TbMqttNode implements TbNode { this.mqttClient.publish(topic, Unpooled.wrappedBuffer(msg.getData().getBytes(UTF8)), MqttQoS.AT_LEAST_ONCE, mqttNodeConfiguration.isRetainedMessage()) .addListener(future -> { if (future.isSuccess()) { - ctx.tellSuccess(msg); + tellSuccess(ctx, msg); } else { - TbMsg next = processException(ctx, msg, future.cause()); - ctx.tellFailure(next, future.cause()); + tellFailure(ctx, processException(ctx, msg, future.cause()), future.cause()); } } ); + ackIfNeeded(ctx, msg); } private TbMsg processException(TbContext ctx, TbMsg origMsg, Throwable e) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java index 5720f66038..3ea96fa967 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mqtt/azure/TbAzureIotHubNode.java @@ -48,8 +48,9 @@ import javax.net.ssl.SSLException; public class TbAzureIotHubNode extends TbMqttNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); + this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); try { - this.mqttNodeConfiguration = TbNodeUtils.convert(configuration, TbMqttNodeConfiguration.class); mqttNodeConfiguration.setPort(8883); mqttNodeConfiguration.setCleanSession(true); ClientCredentials credentials = mqttNodeConfiguration.getCredentials(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java index 057702cb2b..bf57628d64 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbNotificationNode.java @@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.notification.NotificationRequestConfig; import org.thingsboard.server.common.data.notification.info.RuleEngineOriginatedNotificationInfo; @@ -42,12 +43,13 @@ import java.util.concurrent.ExecutionException; configDirective = "tbExternalNodeNotificationConfig", icon = "notifications" ) -public class TbNotificationNode implements TbNode { +public class TbNotificationNode extends TbAbstractExternalNode { private TbNotificationNodeConfiguration config; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbNotificationNodeConfiguration.class); } @@ -69,15 +71,16 @@ public class TbNotificationNode implements TbNode { .originatorEntityId(ctx.getSelf().getRuleChainId()) .build(); - DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> { - return ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> { - TbMsgMetaData metaData = msg.getMetaData().copy(); - metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats)); - ctx.tellSuccess(TbMsg.transformMsg(msg, metaData)); - }); - }), - r -> {}, - e -> ctx.tellFailure(msg, e)); + DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> + ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> { + TbMsgMetaData metaData = msg.getMetaData().copy(); + metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats)); + tellSuccess(ctx, TbMsg.transformMsg(msg, metaData)); + })), + r -> { + }, + e -> tellFailure(ctx, msg, e)); + ackIfNeeded(ctx, msg); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java index 544a864931..fd56043848 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/notification/TbSlackNode.java @@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -37,12 +38,13 @@ import java.util.concurrent.ExecutionException; configDirective = "tbExternalNodeSlackConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAyNCAyNCI+PHBhdGggZD0iTTYsMTVBMiwyIDAgMCwxIDQsMTdBMiwyIDAgMCwxIDIsMTVBMiwyIDAgMCwxIDQsMTNINlYxNU03LDE1QTIsMiAwIDAsMSA5LDEzQTIsMiAwIDAsMSAxMSwxNVYyMEEyLDIgMCAwLDEgOSwyMkEyLDIgMCAwLDEgNywyMFYxNU05LDdBMiwyIDAgMCwxIDcsNUEyLDIgMCAwLDEgOSwzQTIsMiAwIDAsMSAxMSw1VjdIOU05LDhBMiwyIDAgMCwxIDExLDEwQTIsMiAwIDAsMSA5LDEySDRBMiwyIDAgMCwxIDIsMTBBMiwyIDAgMCwxIDQsOEg5TTE3LDEwQTIsMiAwIDAsMSAxOSw4QTIsMiAwIDAsMSAyMSwxMEEyLDIgMCAwLDEgMTksMTJIMTdWMTBNMTYsMTBBMiwyIDAgMCwxIDE0LDEyQTIsMiAwIDAsMSAxMiwxMFY1QTIsMiAwIDAsMSAxNCwzQTIsMiAwIDAsMSAxNiw1VjEwTTE0LDE4QTIsMiAwIDAsMSAxNiwyMEEyLDIgMCAwLDEgMTQsMjJBMiwyIDAgMCwxIDEyLDIwVjE4SDE0TTE0LDE3QTIsMiAwIDAsMSAxMiwxNUEyLDIgMCAwLDEgMTQsMTNIMTlBMiwyIDAgMCwxIDIxLDE1QTIsMiAwIDAsMSAxOSwxN0gxNFoiIC8+PC9zdmc+" ) -public class TbSlackNode implements TbNode { +public class TbSlackNode extends TbAbstractExternalNode { private TbSlackNodeConfiguration config; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbSlackNodeConfiguration.class); } @@ -62,8 +64,9 @@ public class TbSlackNode implements TbNode { DonAsynchron.withCallback(ctx.getExternalCallExecutor().executeAsync(() -> { ctx.getSlackService().sendMessage(ctx.getTenantId(), token, config.getConversation().getId(), message); }), - r -> ctx.tellSuccess(msg), - e -> ctx.tellFailure(msg, e)); + r -> tellSuccess(ctx, msg), + e -> tellFailure(ctx, msg, e)); + ackIfNeeded(ctx, msg); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java index 41dffe2fd8..4ae634902f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rabbitmq/TbRabbitMqNode.java @@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -48,7 +49,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeRabbitMqConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHhtbDpzcGFjZT0icHJlc2VydmUiIHZlcnNpb249IjEuMSIgeT0iMHB4IiB4PSIwcHgiIHZpZXdCb3g9IjAgMCAxMDAwIDEwMDAiPjxwYXRoIHN0cm9rZS13aWR0aD0iLjg0OTU2IiBkPSJtODYwLjQ3IDQxNi4zMmgtMjYyLjAxYy0xMi45MTMgMC0yMy42MTgtMTAuNzA0LTIzLjYxOC0yMy42MTh2LTI3Mi43MWMwLTIwLjMwNS0xNi4yMjctMzYuMjc2LTM2LjI3Ni0zNi4yNzZoLTkzLjc5MmMtMjAuMzA1IDAtMzYuMjc2IDE2LjIyNy0zNi4yNzYgMzYuMjc2djI3MC44NGMtMC4yNTQ4NyAxNC4xMDMtMTEuNDY5IDI1LjU3Mi0yNS43NDIgMjUuNTcybC04NS42MzYgMC42Nzk2NWMtMTQuMTAzIDAtMjUuNTcyLTExLjQ2OS0yNS41NzItMjUuNTcybDAuNjc5NjUtMjcxLjUyYzAtMjAuMzA1LTE2LjIyNy0zNi4yNzYtMzYuMjc2LTM2LjI3NmgtOTMuNTM3Yy0yMC4zMDUgMC0zNi4yNzYgMTYuMjI3LTM2LjI3NiAzNi4yNzZ2NzYzLjg0YzAgMTguMDk2IDE0Ljc4MiAzMi40NTMgMzIuNDUzIDMyLjQ1M2g3MjIuODFjMTguMDk2IDAgMzIuNDUzLTE0Ljc4MiAzMi40NTMtMzIuNDUzdi00MzUuMzFjLTEuMTg5NC0xOC4xODEtMTUuMjkyLTMyLjE5OC0zMy4zODgtMzIuMTk4em0tMTIyLjY4IDI4Ny4wN2MwIDIzLjYxOC0xOC44NiA0Mi40NzgtNDIuNDc4IDQyLjQ3OGgtNzMuOTk3Yy0yMy42MTggMC00Mi40NzgtMTguODYtNDIuNDc4LTQyLjQ3OHYtNzQuMjUyYzAtMjMuNjE4IDE4Ljg2LTQyLjQ3OCA0Mi40NzgtNDIuNDc4aDczLjk5N2MyMy42MTggMCA0Mi40NzggMTguODYgNDIuNDc4IDQyLjQ3OHoiLz48L3N2Zz4=" ) -public class TbRabbitMqNode implements TbNode { +public class TbRabbitMqNode extends TbAbstractExternalNode { private static final Charset UTF8 = Charset.forName("UTF-8"); @@ -61,6 +62,7 @@ public class TbRabbitMqNode implements TbNode { @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbRabbitMqNodeConfiguration.class); ConnectionFactory factory = new ConnectionFactory(); factory.setHost(this.config.getHost()); @@ -83,11 +85,9 @@ public class TbRabbitMqNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { withCallback(publishMessageAsync(ctx, msg), - ctx::tellSuccess, - t -> { - TbMsg next = processException(ctx, msg, t); - ctx.tellFailure(next, t); - }); + m -> tellSuccess(ctx, m), + t -> tellFailure(ctx, processException(ctx, msg, t), t)); + ackIfNeeded(ctx, msg); } private ListenableFuture publishMessageAsync(TbContext ctx, TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index 6f9541f5b1..28e221b0b1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; @Data @Slf4j @@ -182,14 +183,16 @@ public class TbHttpClient { } } - public void processMessage(TbContext ctx, TbMsg msg) { + public void processMessage(TbContext ctx, TbMsg msg, + Consumer onSuccess, + BiConsumer onFailure) { String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg); HttpHeaders headers = prepareHeaders(msg); HttpMethod method = HttpMethod.valueOf(config.getRequestMethod()); HttpEntity entity; - if(HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) || - HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) || - config.isIgnoreRequestBody()) { + if (HttpMethod.GET.equals(method) || HttpMethod.HEAD.equals(method) || + HttpMethod.OPTIONS.equals(method) || HttpMethod.TRACE.equals(method) || + config.isIgnoreRequestBody()) { entity = new HttpEntity<>(headers); } else { entity = new HttpEntity<>(getData(msg), headers); @@ -198,21 +201,18 @@ public class TbHttpClient { URI uri = buildEncodedUri(endpointUrl); ListenableFuture> future = httpClient.exchange( uri, method, entity, String.class); - future.addCallback(new ListenableFutureCallback>() { + future.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(Throwable throwable) { - TbMsg next = processException(ctx, msg, throwable); - ctx.tellFailure(next, throwable); + onFailure.accept(processException(ctx, msg, throwable), throwable); } @Override public void onSuccess(ResponseEntity responseEntity) { if (responseEntity.getStatusCode().is2xxSuccessful()) { - TbMsg next = processResponse(ctx, msg, responseEntity); - ctx.tellSuccess(next); + onSuccess.accept(processResponse(ctx, msg, responseEntity)); } else { - TbMsg next = processFailureResponse(ctx, msg, responseEntity); - ctx.tellNext(next, TbRelationTypes.FAILURE); + onFailure.accept(processFailureResponse(ctx, msg, responseEntity), null); } } }); @@ -248,7 +248,7 @@ public class TbHttpClient { if (config.isTrimDoubleQuotes()) { final String dataBefore = data; - data = data.replaceAll("^\"|\"$", "");; + data = data.replaceAll("^\"|\"$", ""); log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java index d2356c69f7..94b0e5d078 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java @@ -22,6 +22,7 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -43,24 +44,26 @@ import org.thingsboard.server.common.msg.TbMsg; configDirective = "tbExternalNodeRestApiCallConfig", iconUrl = "data:image/svg+xml;base64,PHN2ZyBzdHlsZT0iZW5hYmxlLWJhY2tncm91bmQ6bmV3IDAgMCA1MTIgNTEyIiB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHhtbDpzcGFjZT0icHJlc2VydmUiIHZpZXdCb3g9IjAgMCA1MTIgNTEyIiB2ZXJzaW9uPSIxLjEiIHk9IjBweCIgeD0iMHB4Ij48ZyB0cmFuc2Zvcm09Im1hdHJpeCguOTQ5NzUgMCAwIC45NDk3NSAxNy4xMiAyNi40OTIpIj48cGF0aCBkPSJtMTY5LjExIDEwOC41NGMtOS45MDY2IDAuMDczNC0xOS4wMTQgNi41NzI0LTIyLjAxNCAxNi40NjlsLTY5Ljk5MyAyMzEuMDhjLTMuNjkwNCAxMi4xODEgMy4yODkyIDI1LjIyIDE1LjQ2OSAyOC45MSAyLjIyNTkgMC42NzQ4MSA0LjQ5NjkgMSA2LjcyODUgMSA5Ljk3MjEgMCAxOS4xNjUtNi41MTUzIDIyLjE4Mi0xNi40NjdhNi41MjI0IDYuNTIyNCAwIDAgMCAwLjAwMiAtMC4wMDJsNjkuOTktMjMxLjA3YTYuNTIyNCA2LjUyMjQgMCAwIDAgMCAtMC4wMDJjMy42ODU1LTEyLjE4MS0zLjI4Ny0yNS4yMjUtMTUuNDcxLTI4LjkxMi0yLjI4MjUtMC42OTE0NS00LjYxMTYtMS4wMTY5LTYuODk4NC0xem04NC45ODggMGMtOS45MDQ4IDAuMDczNC0xOS4wMTggNi41Njc1LTIyLjAxOCAxNi40NjlsLTY5Ljk4NiAyMzEuMDhjLTMuNjg5OCAxMi4xNzkgMy4yODUzIDI1LjIxNyAxNS40NjUgMjguOTA4IDIuMjI5NyAwLjY3NjQ3IDQuNTAwOCAxLjAwMiA2LjczMjQgMS4wMDIgOS45NzIxIDAgMTkuMTY1LTYuNTE1MyAyMi4xODItMTYuNDY3YTYuNTIyNCA2LjUyMjQgMCAwIDAgMC4wMDIgLTAuMDAybDY5Ljk4OC0yMzEuMDdjMy42OTA4LTEyLjE4MS0zLjI4NTItMjUuMjIzLTE1LjQ2Ny0yOC45MTItMi4yODE0LTAuNjkyMzEtNC42MTA4LTEuMDE4OS02Ljg5ODQtMS4wMDJ6bS0yMTcuMjkgNDIuMjNjLTEyLjcyOS0wLjAwMDg3LTIzLjE4OCAxMC40NTYtMjMuMTg4IDIzLjE4NiAwLjAwMSAxMi43MjggMTAuNDU5IDIzLjE4NiAyMy4xODggMjMuMTg2IDEyLjcyNy0wLjAwMSAyMy4xODMtMTAuNDU5IDIzLjE4NC0yMy4xODYgMC4wMDA4NzYtMTIuNzI4LTEwLjQ1Ni0yMy4xODUtMjMuMTg0LTIzLjE4NnptMCAxNDYuNjRjLTEyLjcyNy0wLjAwMDg3LTIzLjE4NiAxMC40NTUtMjMuMTg4IDIzLjE4NC0wLjAwMDg3MyAxMi43MjkgMTAuNDU4IDIzLjE4OCAyMy4xODggMjMuMTg4IDEyLjcyOC0wLjAwMSAyMy4xODQtMTAuNDYgMjMuMTg0LTIzLjE4OC0wLjAwMS0xMi43MjYtMTAuNDU3LTIzLjE4My0yMy4xODQtMjMuMTg0em0yNzAuNzkgNDIuMjExYy0xMi43MjcgMC0yMy4xODQgMTAuNDU3LTIzLjE4NCAyMy4xODRzMTAuNDU1IDIzLjE4OCAyMy4xODQgMjMuMTg4aDE1NC45OGMxMi43MjkgMCAyMy4xODYtMTAuNDYgMjMuMTg2LTIzLjE4OCAwLjAwMS0xMi43MjgtMTAuNDU4LTIzLjE4NC0yMy4xODYtMjMuMTg0eiIgdHJhbnNmb3JtPSJtYXRyaXgoMS4wMzc2IDAgMCAxLjAzNzYgLTcuNTY3NiAtMTQuOTI1KSIgc3Ryb2tlLXdpZHRoPSIxLjI2OTMiLz48L2c+PC9zdmc+" ) -public class TbRestApiCallNode implements TbNode { +public class TbRestApiCallNode extends TbAbstractExternalNode { - private boolean useRedisQueueForMsgPersistence; protected TbHttpClient httpClient; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); TbRestApiCallNodeConfiguration config = TbNodeUtils.convert(configuration, TbRestApiCallNodeConfiguration.class); httpClient = new TbHttpClient(config, ctx.getSharedEventLoop()); - useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence(); - if (useRedisQueueForMsgPersistence) { + if (config.isUseRedisQueueForMsgPersistence()) { log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId()); } } @Override public void onMsg(TbContext ctx, TbMsg msg) { - httpClient.processMessage(ctx, msg); + httpClient.processMessage(ctx, msg, + m -> tellSuccess(ctx, m), + (m, t) -> tellFailure(ctx, m, t)); + ackIfNeeded(ctx, msg); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java index e92b15780a..f6cbccd4d4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java @@ -23,6 +23,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.sms.SmsSender; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -39,13 +40,14 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback; configDirective = "tbExternalNodeSendSmsConfig", icon = "sms" ) -public class TbSendSmsNode implements TbNode { +public class TbSendSmsNode extends TbAbstractExternalNode { private TbSendSmsNodeConfiguration config; private SmsSender smsSender; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + super.init(ctx); try { this.config = TbNodeUtils.convert(configuration, TbSendSmsNodeConfiguration.class); if (!this.config.isUseSystemSmsSettings()) { @@ -63,8 +65,9 @@ public class TbSendSmsNode implements TbNode { sendSms(ctx, msg); return null; }), - ok -> ctx.tellSuccess(msg), - fail -> ctx.tellFailure(msg, fail)); + ok -> tellSuccess(ctx, msg), + fail -> tellFailure(ctx, msg, fail)); + ackIfNeeded(ctx, msg); } catch (Exception ex) { ctx.tellFailure(msg, ex); } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java index 14ab6cfbbf..7b6a54ae0b 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/rest/TbHttpClientTest.java @@ -167,7 +167,9 @@ public class TbHttpClientTest { capturedData.capture() )).thenReturn(successMsg); - httpClient.processMessage(ctx, msg); + httpClient.processMessage(ctx, msg, + m -> ctx.tellSuccess(msg), + (m, t) -> ctx.tellFailure(m, t)); Awaitility.await() .atMost(30, TimeUnit.SECONDS)