From 86569c312e96e297ad1de55b31c681bf32c36a28 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 10 Dec 2024 12:21:13 +0200 Subject: [PATCH] added ability to perform calculations on the last records --- .../cf/ctx/state/CalculatedFieldCtx.java | 2 +- .../ctx/state/LastRecordsArgumentEntry.java | 2 +- .../LastRecordsCalculatedFieldState.java | 37 ++++++++++--------- .../ctx/state/ScriptCalculatedFieldState.java | 2 +- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 10395a0d5d..b436e0421e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -51,7 +51,7 @@ public class CalculatedFieldCtx { this.output = configuration.getOutput(); this.expression = configuration.getExpression(); this.tbelInvokeService = tbelInvokeService; - if (CalculatedFieldType.SCRIPT.equals(calculatedField.getType())) { + if (!CalculatedFieldType.SIMPLE.equals(calculatedField.getType())) { this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java index 93fabd3cb8..8b672d4de9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java @@ -37,7 +37,7 @@ public class LastRecordsArgumentEntry implements ArgumentEntry { @JsonIgnore @Override public Object getValue() { - return tsRecords.values(); + return tsRecords; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java index dd69790236..fbcf72fda9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java @@ -17,13 +17,13 @@ package org.thingsboard.server.service.cf.ctx.state; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Data; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.service.cf.CalculatedFieldResult; -import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -56,23 +56,24 @@ public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState { @Override public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { - Map resultMap = new HashMap<>(); - arguments.forEach((key, argumentEntry) -> { - Argument argument = ctx.getArguments().get(key); - TreeMap tsRecords = ((LastRecordsArgumentEntry) argumentEntry).getTsRecords(); - if (tsRecords.size() > argument.getLimit()) { - tsRecords.pollFirstEntry(); - } - long necessaryIntervalTs = calculateIntervalStart(System.currentTimeMillis(), argument.getTimeWindow()); - tsRecords.entrySet().removeIf(tsRecord -> calculateIntervalStart(tsRecord.getKey(), argument.getTimeWindow()) < necessaryIntervalTs); - resultMap.put(key, tsRecords); - }); - Output output = ctx.getOutput(); - return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), resultMap)); - } - - private long calculateIntervalStart(long ts, long interval) { - return (ts / interval) * interval; + if (isValid(ctx.getArguments())) { + arguments.forEach((key, argumentEntry) -> { + Argument argument = ctx.getArguments().get(key); + TreeMap tsRecords = ((LastRecordsArgumentEntry) argumentEntry).getTsRecords(); + if (tsRecords.size() > argument.getLimit()) { + tsRecords.pollFirstEntry(); + } + tsRecords.entrySet().removeIf(tsRecord -> tsRecord.getKey() < System.currentTimeMillis() - argument.getTimeWindow()); + }); + Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray(); + ListenableFuture> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args); + Output output = ctx.getOutput(); + return Futures.transform(resultFuture, + result -> new CalculatedFieldResult(output.getType(), output.getScope(), result), + MoreExecutors.directExecutor() + ); + } + return null; } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java index 99befa6e65..ba050d3b71 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java @@ -37,10 +37,10 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { @Override public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { - Output output = ctx.getOutput(); if (isValid(ctx.getArguments())) { Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray(); ListenableFuture> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args); + Output output = ctx.getOutput(); return Futures.transform(resultFuture, result -> new CalculatedFieldResult(output.getType(), output.getScope(), result), MoreExecutors.directExecutor()