removed CF type LAST_RECORDS and implemented this functionality as a script(added new argument type TS_ROLLING)

This commit is contained in:
IrynaMatveieva 2024-12-12 17:04:24 +02:00
parent 86569c312e
commit a73affea23
15 changed files with 64 additions and 172 deletions

View File

@ -73,12 +73,12 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.LastRecordsCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -435,41 +435,41 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType())
? targetEntityId
: argumentEntityId;
if (CalculatedFieldType.LAST_RECORDS.equals(calculatedFieldCtx.getCfType())) {
return fetchLastRecords(tenantId, entityId, argument);
}
return fetchKvEntry(tenantId, entityId, argument);
}
private ListenableFuture<ArgumentEntry> fetchLastRecords(TenantId tenantId, EntityId entityId, Argument argument) {
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) {
return switch (argument.getType()) {
case "TS_ROLLING" -> fetchTsRolling(tenantId, entityId, argument);
case "ATTRIBUTE" -> transformSingleValueArgument(
Futures.transform(
attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()),
result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))),
calculatedFieldCallbackExecutor)
);
case "TS_LATEST" -> transformSingleValueArgument(
Futures.transform(
timeseriesService.findLatest(tenantId, entityId, argument.getKey()),
result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))),
calculatedFieldCallbackExecutor));
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'.");
};
}
private ListenableFuture<ArgumentEntry> fetchTsRolling(TenantId tenantId, EntityId entityId, Argument argument) {
long currentTime = System.currentTimeMillis();
long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow();
long startTs = currentTime - timeWindow;
int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit();
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE);
ListenableFuture<List<TsKvEntry>> lastRecordsFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
ListenableFuture<List<TsKvEntry>> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
return Futures.transform(lastRecordsFuture, ArgumentEntry::createLastRecordsArgument, calculatedFieldExecutor);
return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? ArgumentEntry.createTsRollingArgument(Collections.emptyList()) : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor);
}
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) {
ListenableFuture<Optional<? extends KvEntry>> kvEntryFuture = switch (argument.getType()) {
case "ATTRIBUTES" -> Futures.transform(
attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()),
result -> result.or(() -> Optional.of(
new BaseAttributeKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument))
)),
MoreExecutors.directExecutor());
case "TIME_SERIES" -> Futures.transform(
timeseriesService.findLatest(tenantId, entityId, argument.getKey()),
result -> result.or(() -> Optional.of(
new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument))
)),
calculatedFieldExecutor);
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'.");
};
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldExecutor);
private ListenableFuture<ArgumentEntry> transformSingleValueArgument(ListenableFuture<Optional<? extends KvEntry>> kvEntryFuture) {
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldCallbackExecutor);
}
private KvEntry createDefaultKvEntry(Argument argument) {
@ -547,7 +547,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return switch (calculatedFieldType) {
case SIMPLE -> new SimpleCalculatedFieldState();
case SCRIPT -> new ScriptCalculatedFieldState();
case LAST_RECORDS -> new LastRecordsCalculatedFieldState();
};
}

View File

@ -32,7 +32,7 @@ import java.util.stream.Collectors;
)
@JsonSubTypes({
@JsonSubTypes.Type(value = SingleValueArgumentEntry.class, name = "SINGLE_VALUE"),
@JsonSubTypes.Type(value = LastRecordsArgumentEntry.class, name = "LAST_RECORDS")
@JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING")
})
public interface ArgumentEntry {
@ -45,8 +45,8 @@ public interface ArgumentEntry {
return new SingleValueArgumentEntry(kvEntry);
}
static ArgumentEntry createLastRecordsArgument(List<TsKvEntry> kvEntries) {
return new LastRecordsArgumentEntry(kvEntries.stream().
static ArgumentEntry createTsRollingArgument(List<TsKvEntry> kvEntries) {
return new TsRollingArgumentEntry(kvEntries.stream().
collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue, (oldValue, newValue) -> newValue, TreeMap::new)));
}

View File

@ -16,5 +16,5 @@
package org.thingsboard.server.service.cf.ctx.state;
public enum ArgumentType {
SINGLE_VALUE, LAST_RECORDS
SINGLE_VALUE, TS_ROLLING
}

View File

@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf.ctx.state;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
@ -36,15 +35,22 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
if (arguments == null) {
arguments = new HashMap<>();
}
arguments.putAll(
argumentValues.entrySet().stream()
.peek(entry -> {
if (entry.getValue() instanceof LastRecordsArgumentEntry) {
throw new IllegalArgumentException("Last records argument entry is not allowed for single calculated field state");
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
argumentValues.forEach((key, argumentEntry) -> {
ArgumentEntry existingArgumentEntry = arguments.get(key);
if (existingArgumentEntry != null) {
if (existingArgumentEntry instanceof SingleValueArgumentEntry) {
arguments.put(key, argumentEntry);
} else if (existingArgumentEntry instanceof TsRollingArgumentEntry existingTsRollingArgumentEntry) {
if (argumentEntry instanceof TsRollingArgumentEntry tsRollingArgumentEntry) {
existingTsRollingArgumentEntry.getTsRecords().putAll(tsRollingArgumentEntry.getTsRecords());
} else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
existingTsRollingArgumentEntry.getTsRecords().put(singleValueArgumentEntry.getTs(), singleValueArgumentEntry.getValue());
}
}
} else {
arguments.put(key, argumentEntry);
}
});
}
}

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.Map;
@ -34,7 +33,6 @@ import java.util.Map;
@JsonSubTypes({
@JsonSubTypes.Type(value = SimpleCalculatedFieldState.class, name = "SIMPLE"),
@JsonSubTypes.Type(value = ScriptCalculatedFieldState.class, name = "SCRIPT"),
@JsonSubTypes.Type(value = LastRecordsCalculatedFieldState.class, name = "LAST_RECORDS")
})
public interface CalculatedFieldState {

View File

@ -1,79 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.Map;
import java.util.TreeMap;
@Data
public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState {
public LastRecordsCalculatedFieldState() {
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.LAST_RECORDS;
}
@Override
public void initState(Map<String, ArgumentEntry> argumentValues) {
if (arguments == null) {
arguments = new TreeMap<>();
}
argumentValues.forEach((key, argumentEntry) -> {
LastRecordsArgumentEntry existingArgumentEntry = (LastRecordsArgumentEntry)
arguments.computeIfAbsent(key, k -> new LastRecordsArgumentEntry(new TreeMap<>()));
if (argumentEntry instanceof LastRecordsArgumentEntry lastRecordsArgumentEntry) {
existingArgumentEntry.getTsRecords().putAll(lastRecordsArgumentEntry.getTsRecords());
} else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
existingArgumentEntry.getTsRecords().put(singleValueArgumentEntry.getTs(), singleValueArgumentEntry.getValue());
}
});
}
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
if (isValid(ctx.getArguments())) {
arguments.forEach((key, argumentEntry) -> {
Argument argument = ctx.getArguments().get(key);
TreeMap<Long, Object> tsRecords = ((LastRecordsArgumentEntry) argumentEntry).getTsRecords();
if (tsRecords.size() > argument.getLimit()) {
tsRecords.pollFirstEntry();
}
tsRecords.entrySet().removeIf(tsRecord -> tsRecord.getKey() < System.currentTimeMillis() - argument.getTimeWindow());
});
Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray();
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);
Output output = ctx.getOutput();
return Futures.transform(resultFuture,
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
MoreExecutors.directExecutor()
);
}
return null;
}
}

View File

@ -21,10 +21,12 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.Map;
import java.util.TreeMap;
@Data
@Slf4j
@ -38,6 +40,16 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx) {
if (isValid(ctx.getArguments())) {
arguments.forEach((key, argumentEntry) -> {
if (argumentEntry instanceof TsRollingArgumentEntry) {
Argument argument = ctx.getArguments().get(key);
TreeMap<Long, Object> tsRecords = ((TsRollingArgumentEntry) argumentEntry).getTsRecords();
if (tsRecords.size() > argument.getLimit()) {
tsRecords.pollFirstEntry();
}
tsRecords.entrySet().removeIf(tsRecord -> tsRecord.getKey() < System.currentTimeMillis() - argument.getTimeWindow());
}
});
Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray();
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);
Output output = ctx.getOutput();
@ -46,7 +58,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
MoreExecutors.directExecutor()
);
}
return null;
return Futures.immediateFuture(null);
}
}

View File

@ -57,7 +57,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
Output output = ctx.getOutput();
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), Map.of(output.getName(), expressionResult)));
}
return null;
return Futures.immediateFuture(null);
}
}

View File

@ -25,13 +25,13 @@ import java.util.TreeMap;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LastRecordsArgumentEntry implements ArgumentEntry {
public class TsRollingArgumentEntry implements ArgumentEntry {
private TreeMap<Long, Object> tsRecords;
@Override
public ArgumentType getType() {
return ArgumentType.LAST_RECORDS;
return ArgumentType.TS_ROLLING;
}
@JsonIgnore

View File

@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.cf;
public enum CalculatedFieldType {
SIMPLE, SCRIPT, LAST_RECORDS
SIMPLE, SCRIPT
}

View File

@ -69,10 +69,10 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
Argument argument = entry.getValue();
if (argument.getEntityId().equals(entityId)) {
switch (argument.getType()) {
case "ATTRIBUTES":
case "ATTRIBUTE":
linkConfiguration.getAttributes().put(entry.getKey(), argument.getKey());
break;
case "TIME_SERIES":
case "TS_LATEST", "TS_ROLLING":
linkConfiguration.getTimeSeries().put(entry.getKey(), argument.getKey());
break;
}

View File

@ -35,8 +35,7 @@ import java.util.UUID;
)
@JsonSubTypes({
@JsonSubTypes.Type(value = SimpleCalculatedFieldConfiguration.class, name = "SIMPLE"),
@JsonSubTypes.Type(value = ScriptCalculatedFieldConfiguration.class, name = "SCRIPT"),
@JsonSubTypes.Type(value = LastRecordsCalculatedFieldConfiguration.class, name = "LAST_RECORDS")
@JsonSubTypes.Type(value = ScriptCalculatedFieldConfiguration.class, name = "SCRIPT")
})
public interface CalculatedFieldConfiguration {

View File

@ -1,39 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import java.util.UUID;
@Data
public class LastRecordsCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration implements CalculatedFieldConfiguration {
public LastRecordsCalculatedFieldConfiguration() {
}
public LastRecordsCalculatedFieldConfiguration(JsonNode config, EntityType entityType, UUID entityId) {
super(config, entityType, entityId);
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.LAST_RECORDS;
}
}

View File

@ -26,7 +26,6 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.LastRecordsCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -124,7 +123,6 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
return switch (CalculatedFieldType.valueOf(type)) {
case SIMPLE -> new SimpleCalculatedFieldConfiguration(config, entityType, entityId);
case SCRIPT -> new ScriptCalculatedFieldConfiguration(config, entityType, entityId);
case LAST_RECORDS -> new LastRecordsCalculatedFieldConfiguration(config, entityType, entityId);
};
}

View File

@ -29,7 +29,6 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.LastRecordsCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -140,7 +139,6 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
return switch (type) {
case SIMPLE -> new SimpleCalculatedFieldConfiguration(config, entityType, entityId);
case SCRIPT -> new ScriptCalculatedFieldConfiguration(config, entityType, entityId);
case LAST_RECORDS -> new LastRecordsCalculatedFieldConfiguration(config, entityType, entityId);
};
}