CF: merge function draft

This commit is contained in:
Andrii Shvaika 2025-02-28 17:46:25 +02:00
parent e6d69e3ed3
commit ac52098207
6 changed files with 143 additions and 9 deletions

View File

@ -136,6 +136,7 @@ public class DefaultTbelInvokeService extends AbstractScriptInvokeService implem
parserConfig.registerDataType("TbelCfSingleValueArg", TbelCfSingleValueArg.class, TbelCfSingleValueArg::memorySize);
parserConfig.registerDataType("TbelCfTsRollingArg", TbelCfTsRollingArg.class, TbelCfTsRollingArg::memorySize);
parserConfig.registerDataType("TbelCfTsDoubleVal", TbelCfTsDoubleVal.class, TbelCfTsDoubleVal::memorySize);
parserConfig.registerDataType("TbelCfTsRollingData", TbelCfTsRollingData.class, TbelCfTsRollingData::memorySize);
parserConfig.registerDataType("TbTimeWindow", TbTimeWindow.class, TbTimeWindow::memorySize);
TbUtils.register(parserConfig);
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "tbel-executor"));

View File

@ -17,9 +17,11 @@ package org.thingsboard.script.api.tbel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TbTimeWindow implements TbelCfObject {
public static final long OBJ_SIZE = 32L;

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -44,6 +44,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
import static java.lang.Character.MAX_RADIX;
@ -367,6 +368,8 @@ public class TbUtils {
byte[].class, int.class)));
parserConfig.addImport("parseBinaryArrayToInt", new MethodStub(TbUtils.class.getMethod("parseBinaryArrayToInt",
byte[].class, int.class, int.class)));
parserConfig.addImport("merge", new MethodStub(TbUtils.class.getMethod("mergeCfTsRollingArgs",
TbelCfTsRollingArg.class, TbelCfTsRollingArg.class)));
}
public static String btoa(String input) {
@ -1506,5 +1509,45 @@ public class TbUtils {
}
return hex;
}
public static TbelCfTsRollingData mergeCfTsRollingArgs(TbelCfTsRollingArg a, TbelCfTsRollingArg b) {
return mergeCfTsRollingArgs(Arrays.asList(a, b), null);
}
public static TbelCfTsRollingData mergeCfTsRollingArgs(List<TbelCfTsRollingArg> args, Map<String, Object> settings) {
TreeSet<Long> allTimestamps = new TreeSet<>();
long startTs = Long.MAX_VALUE;
long endTs = Long.MIN_VALUE;
for (TbelCfTsRollingArg arg : args) {
for (TbelCfTsDoubleVal val : arg.getValues()) {
allTimestamps.add(val.getTs());
}
startTs = Math.min(startTs, arg.getTimeWindow().getStartTs());
endTs = Math.max(endTs, arg.getTimeWindow().getEndTs());
}
List<TbelCfTsMultiDoubleVal> data = new ArrayList<>();
int[] lastIndex = new int[args.size()];
double[] result = new double[args.size()];
Arrays.fill(result, Double.NaN);
var tw = new TbTimeWindow(startTs, endTs, allTimestamps.size());
for (long ts : allTimestamps) {
for (int i = 0; i < args.size(); i++) {
var arg = args.get(i);
var values = arg.getValues();
while (lastIndex[i] < values.size() && values.get(lastIndex[i]).getTs() <= ts) {
result[i] = values.get(lastIndex[i]).getValue();
lastIndex[i]++;
}
}
data.add(new TbelCfTsMultiDoubleVal(ts, Arrays.copyOf(result, result.length)));
}
return new TbelCfTsRollingData(tw, data);
}
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.script.api.tbel;
import lombok.Data;
@Data
public class TbelCfTsMultiDoubleVal implements TbelCfObject {
public static final long OBJ_SIZE = 32L; // Approximate calculation;
private final long ts;
private final double[] values;
@Override
public long memorySize() {
return OBJ_SIZE + values.length * 8L;
}
}

View File

@ -266,11 +266,6 @@ public class TbelCfTsRollingArg implements TbelCfArg, Iterable<TbelCfTsDoubleVal
return values.iterator();
}
@Override
public void forEach(Consumer<? super TbelCfTsDoubleVal> action) {
values.forEach(action);
}
@Override
public String getType() {
return "TS_ROLLING";

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.script.api.tbel;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import static org.thingsboard.script.api.tbel.TbelCfTsDoubleVal.OBJ_SIZE;
public class TbelCfTsRollingData implements TbelCfObject, Iterable<TbelCfTsMultiDoubleVal> {
@Getter
private final TbTimeWindow timeWindow;
@Getter
private final List<TbelCfTsMultiDoubleVal> values;
public TbelCfTsRollingData(TbTimeWindow timeWindow, List<TbelCfTsMultiDoubleVal> values) {
this.timeWindow = timeWindow;
this.values = Collections.unmodifiableList(values);
}
@Override
public long memorySize() {
return 12 + values.size() * OBJ_SIZE;
}
@JsonIgnore
public List<TbelCfTsMultiDoubleVal> getValue() {
return values;
}
@JsonIgnore
public int getSize() {
return values.size();
}
@Override
public Iterator<TbelCfTsMultiDoubleVal> iterator() {
return values.iterator();
}
}