added ability to perform calculations on the last records
This commit is contained in:
parent
4fed2cd416
commit
86569c312e
@ -51,7 +51,7 @@ public class CalculatedFieldCtx {
|
|||||||
this.output = configuration.getOutput();
|
this.output = configuration.getOutput();
|
||||||
this.expression = configuration.getExpression();
|
this.expression = configuration.getExpression();
|
||||||
this.tbelInvokeService = tbelInvokeService;
|
this.tbelInvokeService = tbelInvokeService;
|
||||||
if (CalculatedFieldType.SCRIPT.equals(calculatedField.getType())) {
|
if (!CalculatedFieldType.SIMPLE.equals(calculatedField.getType())) {
|
||||||
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
|
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ public class LastRecordsArgumentEntry implements ArgumentEntry {
|
|||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
@Override
|
@Override
|
||||||
public Object getValue() {
|
public Object getValue() {
|
||||||
return tsRecords.values();
|
return tsRecords;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,13 +17,13 @@ package org.thingsboard.server.service.cf.ctx.state;
|
|||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
import org.thingsboard.server.common.data.cf.configuration.Argument;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.Output;
|
import org.thingsboard.server.common.data.cf.configuration.Output;
|
||||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
@ -56,23 +56,24 @@ public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
|
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
|
||||||
Map<String, Object> resultMap = new HashMap<>();
|
if (isValid(ctx.getArguments())) {
|
||||||
arguments.forEach((key, argumentEntry) -> {
|
arguments.forEach((key, argumentEntry) -> {
|
||||||
Argument argument = ctx.getArguments().get(key);
|
Argument argument = ctx.getArguments().get(key);
|
||||||
TreeMap<Long, Object> tsRecords = ((LastRecordsArgumentEntry) argumentEntry).getTsRecords();
|
TreeMap<Long, Object> tsRecords = ((LastRecordsArgumentEntry) argumentEntry).getTsRecords();
|
||||||
if (tsRecords.size() > argument.getLimit()) {
|
if (tsRecords.size() > argument.getLimit()) {
|
||||||
tsRecords.pollFirstEntry();
|
tsRecords.pollFirstEntry();
|
||||||
}
|
}
|
||||||
long necessaryIntervalTs = calculateIntervalStart(System.currentTimeMillis(), argument.getTimeWindow());
|
tsRecords.entrySet().removeIf(tsRecord -> tsRecord.getKey() < System.currentTimeMillis() - argument.getTimeWindow());
|
||||||
tsRecords.entrySet().removeIf(tsRecord -> calculateIntervalStart(tsRecord.getKey(), argument.getTimeWindow()) < necessaryIntervalTs);
|
});
|
||||||
resultMap.put(key, tsRecords);
|
Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray();
|
||||||
});
|
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);
|
||||||
Output output = ctx.getOutput();
|
Output output = ctx.getOutput();
|
||||||
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), resultMap));
|
return Futures.transform(resultFuture,
|
||||||
}
|
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
|
||||||
|
MoreExecutors.directExecutor()
|
||||||
private long calculateIntervalStart(long ts, long interval) {
|
);
|
||||||
return (ts / interval) * interval;
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,10 +37,10 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
|
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
|
||||||
Output output = ctx.getOutput();
|
|
||||||
if (isValid(ctx.getArguments())) {
|
if (isValid(ctx.getArguments())) {
|
||||||
Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray();
|
Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray();
|
||||||
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);
|
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);
|
||||||
|
Output output = ctx.getOutput();
|
||||||
return Futures.transform(resultFuture,
|
return Futures.transform(resultFuture,
|
||||||
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
|
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
|
||||||
MoreExecutors.directExecutor()
|
MoreExecutors.directExecutor()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user