added deletion from map if ts out of timewindow

This commit is contained in:
IrynaMatveieva 2024-11-27 12:10:04 +02:00
parent d684c8777a
commit f9db64a14d
5 changed files with 46 additions and 38 deletions

View File

@ -15,12 +15,14 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
@JsonTypeInfo(
@ -34,16 +36,18 @@ import java.util.stream.Collectors;
})
public interface ArgumentEntry {
@JsonIgnore
ArgumentType getType();
Object getValue();
static ArgumentEntry createSingleValueArgument(KvEntry kvEntry) {
return new SingleValueArgumentEntry(kvEntry.getValue());
return new SingleValueArgumentEntry(kvEntry);
}
static ArgumentEntry createLastRecordsArgument(List<TsKvEntry> kvEntries) {
return new LastRecordsArgumentEntry(kvEntries.stream() .collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue)));
return new LastRecordsArgumentEntry(kvEntries.stream().
collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue, (oldValue, newValue) -> newValue, TreeMap::new)));
}
}

View File

@ -15,24 +15,26 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.TreeMap;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LastRecordsArgumentEntry implements ArgumentEntry {
private Map<Long, Object> tsRecords;
private TreeMap<Long, Object> tsRecords;
@Override
public ArgumentType getType() {
return ArgumentType.LAST_RECORDS;
}
@JsonIgnore
@Override
public Object getValue() {
return tsRecords.values();

View File

@ -19,16 +19,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.TreeMap;
@Data
public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState {
@ -44,16 +41,15 @@ public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState {
@Override
public void initState(Map<String, ArgumentEntry> argumentValues) {
if (arguments == null) {
arguments = new HashMap<>();
arguments = new TreeMap<>();
}
argumentValues.forEach((key, argumentEntry) -> {
LastRecordsArgumentEntry existingArgumentEntry = (LastRecordsArgumentEntry)
arguments.computeIfAbsent(key, k -> new LastRecordsArgumentEntry(new HashMap<>()));
arguments.computeIfAbsent(key, k -> new LastRecordsArgumentEntry(new TreeMap<>()));
if (argumentEntry instanceof LastRecordsArgumentEntry lastRecordsArgumentEntry) {
existingArgumentEntry.getTsRecords().putAll(lastRecordsArgumentEntry.getTsRecords());
} else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry
&& singleValueArgumentEntry.getValue() instanceof TsKvEntry tsKvEntry) {
existingArgumentEntry.getTsRecords().put(tsKvEntry.getTs(), tsKvEntry.getValue());
} else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
existingArgumentEntry.getTsRecords().put(singleValueArgumentEntry.getTs(), singleValueArgumentEntry.getValue());
}
});
}
@ -61,26 +57,22 @@ public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState {
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
Map<String, Object> resultMap = new HashMap<>();
arguments.replaceAll((key, argumentEntry) -> {
int limit = ctx.getArguments().get(key).getLimit();
// TODO: implement removing if size > limit
// List<TsKvEntry> limitedEntries = entries.stream()
// .sorted(Comparator.comparingLong(TsKvEntry::getTs).reversed())
// .limit(limit)
// .collect(Collectors.toList());
//
// Map<Long, Object> valueWithTs = limitedEntries.stream()
// .collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue));
// resultMap.put(key, valueWithTs);
// return new LastRecordsArgumentEntry(limitedEntries);
return null;
arguments.forEach((key, argumentEntry) -> {
Argument argument = ctx.getArguments().get(key);
TreeMap<Long, Object> tsRecords = ((LastRecordsArgumentEntry) argumentEntry).getTsRecords();
if (tsRecords.size() > argument.getLimit()) {
tsRecords.pollFirstEntry();
}
long necessaryIntervalTs = calculateIntervalStart(System.currentTimeMillis(), argument.getTimeWindow());
tsRecords.entrySet().removeIf(tsRecord -> calculateIntervalStart(tsRecord.getKey(), argument.getTimeWindow()) < necessaryIntervalTs);
resultMap.put(key, tsRecords);
});
Output output = ctx.getOutput();
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), resultMap));
}
private long calculateIntervalStart(long ts, long interval) {
return (ts / interval) * interval;
}
}

View File

@ -21,9 +21,7 @@ import lombok.Data;
import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap;
@ -51,7 +49,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
customExpression.set(expr);
}
Map<String, Double> variables = new HashMap<>();
this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(((KvEntry) v.getValue()).getValueAsString())));
this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(v.getValue().toString())));
expr.setVariables(variables);
double expressionResult = expr.evaluate();

View File

@ -15,17 +15,29 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SingleValueArgumentEntry implements ArgumentEntry {
private long ts;
private Object value;
public SingleValueArgumentEntry() {
}
public SingleValueArgumentEntry(KvEntry entry) {
if (entry instanceof TsKvEntry) {
this.ts = ((TsKvEntry) entry).getTs();
} else if (entry instanceof AttributeKvEntry) {
this.ts = ((AttributeKvEntry) entry).getLastUpdateTs();
}
this.value = entry.getValue();
}
@Override
public ArgumentType getType() {
return ArgumentType.SINGLE_VALUE;