merge function draft
This commit is contained in:
parent
0ec8d7cb46
commit
6265ff45a8
@ -138,6 +138,8 @@ public class DefaultTbelInvokeService extends AbstractScriptInvokeService implem
|
|||||||
parserConfig.registerDataType("TbelCfTsDoubleVal", TbelCfTsDoubleVal.class, TbelCfTsDoubleVal::memorySize);
|
parserConfig.registerDataType("TbelCfTsDoubleVal", TbelCfTsDoubleVal.class, TbelCfTsDoubleVal::memorySize);
|
||||||
parserConfig.registerDataType("TbelCfTsRollingData", TbelCfTsRollingData.class, TbelCfTsRollingData::memorySize);
|
parserConfig.registerDataType("TbelCfTsRollingData", TbelCfTsRollingData.class, TbelCfTsRollingData::memorySize);
|
||||||
parserConfig.registerDataType("TbTimeWindow", TbTimeWindow.class, TbTimeWindow::memorySize);
|
parserConfig.registerDataType("TbTimeWindow", TbTimeWindow.class, TbTimeWindow::memorySize);
|
||||||
|
parserConfig.registerDataType("TbelCfTsDoubleVal", TbelCfTsMultiDoubleVal.class, TbelCfTsMultiDoubleVal::memorySize);
|
||||||
|
|
||||||
TbUtils.register(parserConfig);
|
TbUtils.register(parserConfig);
|
||||||
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "tbel-executor"));
|
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "tbel-executor"));
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -35,4 +35,7 @@ public class TbTimeWindow implements TbelCfObject {
|
|||||||
return OBJ_SIZE;
|
return OBJ_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean matches(long ts) {
|
||||||
|
return ts >= startTs && ts < endTs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2025 The Thingsboard Authors
|
* Copyright © 2016-2025 The Thingsboard Authors
|
||||||
* <p>
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
* <p>
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p>
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
@ -368,8 +368,16 @@ public class TbUtils {
|
|||||||
byte[].class, int.class)));
|
byte[].class, int.class)));
|
||||||
parserConfig.addImport("parseBinaryArrayToInt", new MethodStub(TbUtils.class.getMethod("parseBinaryArrayToInt",
|
parserConfig.addImport("parseBinaryArrayToInt", new MethodStub(TbUtils.class.getMethod("parseBinaryArrayToInt",
|
||||||
byte[].class, int.class, int.class)));
|
byte[].class, int.class, int.class)));
|
||||||
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("mergeCfTsRollingArgs",
|
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("merge",
|
||||||
TbelCfTsRollingArg.class, TbelCfTsRollingArg.class)));
|
TbelCfTsRollingArg.class, TbelCfTsRollingArg.class)));
|
||||||
|
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("merge",
|
||||||
|
TbelCfTsRollingArg.class, TbelCfTsRollingArg.class, TbTimeWindow.class)));
|
||||||
|
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("merge",
|
||||||
|
TbelCfTsRollingArg.class, TbelCfTsRollingArg.class, TbTimeWindow.class, Map.class)));
|
||||||
|
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("merge",
|
||||||
|
List.class, TbTimeWindow.class)));
|
||||||
|
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("merge",
|
||||||
|
List.class, TbTimeWindow.class, Map.class)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String btoa(String input) {
|
public static String btoa(String input) {
|
||||||
@ -1510,11 +1518,28 @@ public class TbUtils {
|
|||||||
return hex;
|
return hex;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TbelCfTsRollingData mergeCfTsRollingArgs(TbelCfTsRollingArg a, TbelCfTsRollingArg b) {
|
public static TbelCfTsRollingData merge(TbelCfTsRollingArg a, TbelCfTsRollingArg b) {
|
||||||
return mergeCfTsRollingArgs(Arrays.asList(a, b), null);
|
return merge(Arrays.asList(a, b), null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TbelCfTsRollingData mergeCfTsRollingArgs(List<TbelCfTsRollingArg> args, Map<String, Object> settings) {
|
public static TbelCfTsRollingData merge(TbelCfTsRollingArg a, TbelCfTsRollingArg b, TbTimeWindow timeWindow) {
|
||||||
|
return merge(Arrays.asList(a, b), timeWindow, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TbelCfTsRollingData merge(TbelCfTsRollingArg a, TbelCfTsRollingArg b, TbTimeWindow timeWindow, Map<String, Object> settings) {
|
||||||
|
return merge(Arrays.asList(a, b), timeWindow, settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TbelCfTsRollingData merge(List<TbelCfTsRollingArg> args, TbTimeWindow timeWindow) {
|
||||||
|
return merge(args, timeWindow, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TbelCfTsRollingData merge(List<TbelCfTsRollingArg> args, TbTimeWindow timeWindow, Map<String, Object> settings) {
|
||||||
|
boolean ignoreNaN = true;
|
||||||
|
if (settings != null && settings.containsKey("ignoreNaN")) {
|
||||||
|
ignoreNaN = Boolean.parseBoolean(settings.get("ignoreNaN").toString());
|
||||||
|
}
|
||||||
|
|
||||||
TreeSet<Long> allTimestamps = new TreeSet<>();
|
TreeSet<Long> allTimestamps = new TreeSet<>();
|
||||||
long startTs = Long.MAX_VALUE;
|
long startTs = Long.MAX_VALUE;
|
||||||
long endTs = Long.MIN_VALUE;
|
long endTs = Long.MIN_VALUE;
|
||||||
@ -1532,7 +1557,7 @@ public class TbUtils {
|
|||||||
double[] result = new double[args.size()];
|
double[] result = new double[args.size()];
|
||||||
Arrays.fill(result, Double.NaN);
|
Arrays.fill(result, Double.NaN);
|
||||||
|
|
||||||
var tw = new TbTimeWindow(startTs, endTs, allTimestamps.size());
|
var tw = timeWindow != null ? timeWindow : new TbTimeWindow(startTs, endTs, allTimestamps.size());
|
||||||
|
|
||||||
for (long ts : allTimestamps) {
|
for (long ts : allTimestamps) {
|
||||||
for (int i = 0; i < args.size(); i++) {
|
for (int i = 0; i < args.size(); i++) {
|
||||||
@ -1543,7 +1568,22 @@ public class TbUtils {
|
|||||||
lastIndex[i]++;
|
lastIndex[i]++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
data.add(new TbelCfTsMultiDoubleVal(ts, Arrays.copyOf(result, result.length)));
|
if (tw.matches(ts)) {
|
||||||
|
if (ignoreNaN) {
|
||||||
|
boolean skip = false;
|
||||||
|
for (int i = 0; i < args.size(); i++) {
|
||||||
|
if (Double.isNaN(result[i])) {
|
||||||
|
skip = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!skip) {
|
||||||
|
data.add(new TbelCfTsMultiDoubleVal(ts, Arrays.copyOf(result, result.length)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
data.add(new TbelCfTsMultiDoubleVal(ts, Arrays.copyOf(result, result.length)));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new TbelCfTsRollingData(tw, data);
|
return new TbelCfTsRollingData(tw, data);
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.script.api.tbel;
|
package org.thingsboard.script.api.tbel;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@ -25,6 +26,39 @@ public class TbelCfTsMultiDoubleVal implements TbelCfObject {
|
|||||||
private final long ts;
|
private final long ts;
|
||||||
private final double[] values;
|
private final double[] values;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public double getV1() {
|
||||||
|
return getV(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public double getV2() {
|
||||||
|
return getV(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public double getV3() {
|
||||||
|
return getV(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public double getV4() {
|
||||||
|
return getV(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
public double getV5() {
|
||||||
|
return getV(4);
|
||||||
|
}
|
||||||
|
|
||||||
|
private double getV(int idx) {
|
||||||
|
if (values.length < idx + 1) {
|
||||||
|
throw new IllegalArgumentException("Can't get value at index " + idx + ". There are " + values.length + " values present.");
|
||||||
|
} else {
|
||||||
|
return values[idx];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long memorySize() {
|
public long memorySize() {
|
||||||
return OBJ_SIZE + values.length * 8L;
|
return OBJ_SIZE + values.length * 8L;
|
||||||
|
|||||||
@ -1109,7 +1109,7 @@ public class TbUtilsTest {
|
|||||||
String validInput = Base64.getEncoder().encodeToString(new byte[]{1, 2, 3, 4, 5});
|
String validInput = Base64.getEncoder().encodeToString(new byte[]{1, 2, 3, 4, 5});
|
||||||
ExecutionArrayList<Byte> actual = TbUtils.base64ToBytesList(ctx, validInput);
|
ExecutionArrayList<Byte> actual = TbUtils.base64ToBytesList(ctx, validInput);
|
||||||
ExecutionArrayList<Byte> expected = new ExecutionArrayList<>(ctx);
|
ExecutionArrayList<Byte> expected = new ExecutionArrayList<>(ctx);
|
||||||
expected.addAll(List.of((byte) 1, (byte)2, (byte)3, (byte)4, (byte)5));
|
expected.addAll(List.of((byte) 1, (byte) 2, (byte) 3, (byte) 4, (byte) 5));
|
||||||
Assertions.assertEquals(expected, actual);
|
Assertions.assertEquals(expected, actual);
|
||||||
|
|
||||||
String emptyInput = Base64.getEncoder().encodeToString(new byte[]{});
|
String emptyInput = Base64.getEncoder().encodeToString(new byte[]{});
|
||||||
@ -1123,6 +1123,7 @@ public class TbUtilsTest {
|
|||||||
TbUtils.base64ToBytesList(ctx, null);
|
TbUtils.base64ToBytesList(ctx, null);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void bytesToHex_Test() {
|
public void bytesToHex_Test() {
|
||||||
byte[] bb = {(byte) 0xBB, (byte) 0xAA};
|
byte[] bb = {(byte) 0xBB, (byte) 0xAA};
|
||||||
@ -1136,6 +1137,75 @@ public class TbUtilsTest {
|
|||||||
Assertions.assertEquals(expected, actual);
|
Assertions.assertEquals(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void merge_two_rolling_args_ts_match_test() {
|
||||||
|
TbTimeWindow tw = new TbTimeWindow(0, 60000, 1000);
|
||||||
|
TbelCfTsRollingArg arg1 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(1000, 1), new TbelCfTsDoubleVal(5000, 2), new TbelCfTsDoubleVal(15000, 3)));
|
||||||
|
TbelCfTsRollingArg arg2 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(1000, 11), new TbelCfTsDoubleVal(5000, 12), new TbelCfTsDoubleVal(15000, 13)));
|
||||||
|
|
||||||
|
var result = TbUtils.merge(arg1, arg2);
|
||||||
|
Assertions.assertEquals(3, result.getSize());
|
||||||
|
Assertions.assertNotNull(result.getValues());
|
||||||
|
Assertions.assertNotNull(result.getValues().get(0));
|
||||||
|
Assertions.assertEquals(1000L, result.getValues().get(0).getTs());
|
||||||
|
Assertions.assertEquals(1, result.getValues().get(0).getValues()[0]);
|
||||||
|
Assertions.assertEquals(11, result.getValues().get(0).getValues()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void merge_two_rolling_args_with_timewindow_test() {
|
||||||
|
TbTimeWindow tw = new TbTimeWindow(0, 60000, 1000);
|
||||||
|
TbelCfTsRollingArg arg1 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(1000, 1), new TbelCfTsDoubleVal(5000, 2), new TbelCfTsDoubleVal(15000, 3)));
|
||||||
|
TbelCfTsRollingArg arg2 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(1000, 11), new TbelCfTsDoubleVal(5000, 12), new TbelCfTsDoubleVal(15000, 13)));
|
||||||
|
|
||||||
|
var result = TbUtils.merge(arg1, arg2, new TbTimeWindow(0, 10000, 1000));
|
||||||
|
Assertions.assertEquals(2, result.getSize());
|
||||||
|
Assertions.assertNotNull(result.getValues());
|
||||||
|
Assertions.assertNotNull(result.getValues().get(0));
|
||||||
|
Assertions.assertEquals(1000L, result.getValues().get(0).getTs());
|
||||||
|
Assertions.assertEquals(1, result.getValues().get(0).getValues()[0]);
|
||||||
|
Assertions.assertEquals(11, result.getValues().get(0).getValues()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void merge_two_rolling_args_ts_mismatch_default_test() {
|
||||||
|
TbTimeWindow tw = new TbTimeWindow(0, 60000, 1000);
|
||||||
|
TbelCfTsRollingArg arg1 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(100, 1), new TbelCfTsDoubleVal(5000, 2), new TbelCfTsDoubleVal(15000, 3)));
|
||||||
|
TbelCfTsRollingArg arg2 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(200, 11), new TbelCfTsDoubleVal(5000, 12), new TbelCfTsDoubleVal(15000, 13)));
|
||||||
|
|
||||||
|
var result = TbUtils.merge(arg1, arg2);
|
||||||
|
Assertions.assertEquals(3, result.getSize());
|
||||||
|
Assertions.assertNotNull(result.getValues());
|
||||||
|
|
||||||
|
TbelCfTsMultiDoubleVal item0 = result.getValues().get(0);
|
||||||
|
Assertions.assertNotNull(item0);
|
||||||
|
Assertions.assertEquals(200L, item0.getTs());
|
||||||
|
Assertions.assertEquals(1, item0.getValues()[0]);
|
||||||
|
Assertions.assertEquals(11, item0.getValues()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void merge_two_rolling_args_ts_mismatch_ignore_nan_disabled_test() {
|
||||||
|
TbTimeWindow tw = new TbTimeWindow(0, 60000, 1000);
|
||||||
|
TbelCfTsRollingArg arg1 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(100, 1), new TbelCfTsDoubleVal(5000, 2), new TbelCfTsDoubleVal(15000, 3)));
|
||||||
|
TbelCfTsRollingArg arg2 = new TbelCfTsRollingArg(tw, Arrays.asList(new TbelCfTsDoubleVal(200, 11), new TbelCfTsDoubleVal(5000, 12), new TbelCfTsDoubleVal(15000, 13)));
|
||||||
|
|
||||||
|
var result = TbUtils.merge(Arrays.asList(arg1, arg2), new TbTimeWindow(0, 60000, 1000), Collections.singletonMap("ignoreNaN", false));
|
||||||
|
Assertions.assertEquals(4, result.getSize());
|
||||||
|
Assertions.assertNotNull(result.getValues());
|
||||||
|
|
||||||
|
TbelCfTsMultiDoubleVal item0 = result.getValues().get(0);
|
||||||
|
Assertions.assertNotNull(item0);
|
||||||
|
Assertions.assertEquals(100L, item0.getTs());
|
||||||
|
Assertions.assertEquals(1, item0.getValues()[0]);
|
||||||
|
Assertions.assertEquals(Double.NaN, item0.getValues()[1]);
|
||||||
|
|
||||||
|
TbelCfTsMultiDoubleVal item1 = result.getValues().get(1);
|
||||||
|
Assertions.assertEquals(200L, item1.getTs());
|
||||||
|
Assertions.assertEquals(1, item1.getValues()[0]);
|
||||||
|
Assertions.assertEquals(11, item1.getValues()[1]);
|
||||||
|
}
|
||||||
|
|
||||||
private static List<Byte> toList(byte[] data) {
|
private static List<Byte> toList(byte[] data) {
|
||||||
List<Byte> result = new ArrayList<>(data.length);
|
List<Byte> result = new ArrayList<>(data.length);
|
||||||
for (Byte b : data) {
|
for (Byte b : data) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user