added api limits and fixed tests
This commit is contained in:
parent
7e055ec353
commit
1a67769f1c
@ -198,20 +198,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
return state;
|
||||
}
|
||||
|
||||
private UUID toTbMsgId(CalculatedFieldTelemetryMsgProto proto) {
|
||||
if (proto.getTbMsgIdMSB() != 0 && proto.getTbMsgIdLSB() != 0) {
|
||||
return new UUID(proto.getTbMsgIdMSB(), proto.getTbMsgIdLSB());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) {
|
||||
if (!proto.getTbMsgType().isEmpty()) {
|
||||
return TbMsgType.valueOf(proto.getTbMsgType());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
|
||||
if (state.isReady()) {
|
||||
@ -290,4 +276,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
return cfIds;
|
||||
}
|
||||
|
||||
private UUID toTbMsgId(CalculatedFieldTelemetryMsgProto proto) {
|
||||
if (proto.getTbMsgIdMSB() != 0 && proto.getTbMsgIdLSB() != 0) {
|
||||
return new UUID(proto.getTbMsgIdMSB(), proto.getTbMsgIdLSB());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) {
|
||||
if (!proto.getTbMsgType().isEmpty()) {
|
||||
return TbMsgType.valueOf(proto.getTbMsgType());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -217,6 +217,13 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService());
|
||||
try {
|
||||
cfCtx.init();
|
||||
} catch (Exception e) {
|
||||
if (DebugModeUtil.isDebugAllAvailable(cf)) {
|
||||
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e);
|
||||
}
|
||||
}
|
||||
calculatedFields.put(cf.getId(), cfCtx);
|
||||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
||||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
||||
@ -257,6 +264,13 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
||||
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
||||
if (newCfCtx.hasSignificantChanges(oldCfCtx)) {
|
||||
try {
|
||||
newCfCtx.init();
|
||||
} catch (Exception e) {
|
||||
if (DebugModeUtil.isDebugAllAvailable(newCf)) {
|
||||
systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e);
|
||||
}
|
||||
}
|
||||
initCf(newCfCtx, callback, true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,6 +61,7 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
@ -69,6 +70,7 @@ import org.thingsboard.server.common.util.ProtoUtils;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
|
||||
@ -142,14 +144,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
private final TimeseriesService timeseriesService;
|
||||
private final CalculatedFieldStateService stateService;
|
||||
private final TbClusterService clusterService;
|
||||
private final ApiLimitService apiLimitService;
|
||||
|
||||
private ListeningExecutorService calculatedFieldExecutor;
|
||||
private ListeningExecutorService calculatedFieldCallbackExecutor;
|
||||
|
||||
private final ConcurrentMap<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> states = new ConcurrentHashMap<>();
|
||||
|
||||
private static final int MAX_LAST_RECORDS_VALUE = 1024;
|
||||
|
||||
private static final Set<EntityType> supportedReferencedEntities = EnumSet.of(
|
||||
EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT
|
||||
);
|
||||
@ -560,7 +561,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
long currentTime = System.currentTimeMillis();
|
||||
long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow();
|
||||
long startTs = currentTime - timeWindow;
|
||||
int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit();
|
||||
long maxDataPoints = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
|
||||
int limit = argument.getLimit() == 0 ? (int) maxDataPoints : argument.getLimit();
|
||||
|
||||
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startTs, currentTime, 0, limit, Aggregation.NONE);
|
||||
ListenableFuture<List<TsKvEntry>> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
|
||||
|
||||
@ -197,7 +197,8 @@ public class CalculatedFieldCtx {
|
||||
boolean entityIdChanged = !entityId.equals(other.entityId);
|
||||
boolean typeChanged = !cfType.equals(other.cfType);
|
||||
boolean argumentsChanged = !arguments.equals(other.arguments);
|
||||
return entityIdChanged || typeChanged || argumentsChanged;
|
||||
boolean expressionChanged = !expression.equals(other.expression);
|
||||
return entityIdChanged || typeChanged || argumentsChanged || expressionChanged;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -115,7 +115,7 @@ public class RocksDBStateService implements CalculatedFieldStateService {
|
||||
singleValueProtoBuilder.setVersion(entry.getVersion());
|
||||
}
|
||||
|
||||
KvEntry value = entry.getValue();
|
||||
KvEntry value = entry.getKvEntryValue();
|
||||
if (value != null) {
|
||||
singleValueProtoBuilder.setHasV(true)
|
||||
.setValue(ProtoUtils.toKeyValueProto(value));
|
||||
|
||||
@ -53,7 +53,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
||||
|
||||
for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) {
|
||||
try {
|
||||
BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getValue();
|
||||
BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue();
|
||||
expr.setVariable(entry.getKey(), Double.parseDouble(kvEntry.getValueAsString()));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.");
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.cf.ctx.state;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
@ -34,19 +35,19 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
||||
public static final ArgumentEntry EMPTY = new SingleValueArgumentEntry(0);
|
||||
|
||||
private long ts;
|
||||
private BasicKvEntry value;
|
||||
private BasicKvEntry kvEntryValue;
|
||||
private Long version;
|
||||
|
||||
public SingleValueArgumentEntry(TsKvProto entry) {
|
||||
this.ts = entry.getTs();
|
||||
this.version = entry.getVersion();
|
||||
this.value = ProtoUtils.fromProto(entry.getKv());
|
||||
this.kvEntryValue = ProtoUtils.fromProto(entry.getKv());
|
||||
}
|
||||
|
||||
public SingleValueArgumentEntry(AttributeValueProto entry) {
|
||||
this.ts = entry.getLastUpdateTs();
|
||||
this.version = entry.getVersion();
|
||||
this.value = ProtoUtils.basicKvEntryFromProto(entry);
|
||||
this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry);
|
||||
}
|
||||
|
||||
public SingleValueArgumentEntry(KvEntry entry) {
|
||||
@ -57,7 +58,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
||||
this.ts = attributeKvEntry.getLastUpdateTs();
|
||||
this.version = attributeKvEntry.getVersion();
|
||||
}
|
||||
this.value = ProtoUtils.basicKvEntryFromKvEntry(entry);
|
||||
this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -65,7 +66,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
||||
* */
|
||||
private SingleValueArgumentEntry(int ignored) {
|
||||
this.ts = System.currentTimeMillis();
|
||||
this.value = null;
|
||||
this.kvEntryValue = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -73,9 +74,14 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
||||
return ArgumentEntryType.SINGLE_VALUE;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public Object getValue() {
|
||||
return kvEntryValue.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArgumentEntry copy() {
|
||||
return new SingleValueArgumentEntry(this.ts, this.value, this.version);
|
||||
return new SingleValueArgumentEntry(this.ts, this.kvEntryValue, this.version);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -88,7 +94,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
|
||||
Long newVersion = singleValueEntry.getVersion();
|
||||
if (newVersion == null || this.version == null || newVersion > this.version) {
|
||||
this.ts = singleValueEntry.getTs();
|
||||
this.value = singleValueEntry.getValue();
|
||||
this.kvEntryValue = singleValueEntry.getKvEntryValue();
|
||||
this.version = newVersion;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -98,7 +98,7 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
|
||||
}
|
||||
|
||||
private boolean updateSingleValueEntry(SingleValueArgumentEntry singleValueEntry) {
|
||||
return addTsRecordIfAbsent(singleValueEntry.getTs(), singleValueEntry.getValue());
|
||||
return addTsRecordIfAbsent(singleValueEntry.getTs(), singleValueEntry.getKvEntryValue());
|
||||
}
|
||||
|
||||
private boolean addTsRecordIfAbsent(Long ts, KvEntry value) {
|
||||
|
||||
@ -48,9 +48,6 @@ import static org.thingsboard.server.dao.service.Validator.validateEntityId;
|
||||
@RequiredArgsConstructor
|
||||
public class DefaultTbCalculatedFieldService extends AbstractTbEntityService implements TbCalculatedFieldService {
|
||||
|
||||
private static final int MAX_ARGUMENT_SIZE = 10;
|
||||
private static final int MAX_CALCULATED_FIELD_NUMBER = 10;
|
||||
|
||||
private final CalculatedFieldService calculatedFieldService;
|
||||
|
||||
@Override
|
||||
@ -62,9 +59,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
|
||||
CalculatedField existingCf = calculatedFieldService.findById(tenantId, calculatedField.getId());
|
||||
checkForEntityChange(existingCf, calculatedField);
|
||||
}
|
||||
checkCalculatedFieldNumber(tenantId, calculatedField.getEntityId());
|
||||
checkEntityExistence(tenantId, calculatedField.getEntityId());
|
||||
checkArgumentSize(calculatedField.getConfiguration());
|
||||
checkReferencedEntities(calculatedField.getConfiguration(), user);
|
||||
CalculatedField savedCalculatedField = checkNotNull(calculatedFieldService.save(calculatedField));
|
||||
logEntityActionService.logEntityAction(tenantId, savedCalculatedField.getId(), savedCalculatedField, actionType, user);
|
||||
@ -129,19 +124,6 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
|
||||
|
||||
}
|
||||
|
||||
private void checkArgumentSize(CalculatedFieldConfiguration calculatedFieldConfig) {
|
||||
if (calculatedFieldConfig.getArguments().size() > MAX_ARGUMENT_SIZE) {
|
||||
throw new IllegalArgumentException("Too many arguments: " + calculatedFieldConfig.getArguments().size() + ". Max number of argument is " + MAX_ARGUMENT_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkCalculatedFieldNumber(TenantId tenantId, EntityId entityId) {
|
||||
int numberOfCalculatedFieldsByEntityId = calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, entityId).size();
|
||||
if (numberOfCalculatedFieldsByEntityId >= MAX_CALCULATED_FIELD_NUMBER) {
|
||||
throw new IllegalArgumentException("Max number of calculated fields for entity is " + MAX_CALCULATED_FIELD_NUMBER);
|
||||
}
|
||||
}
|
||||
|
||||
private <E extends HasId<I> & HasTenantId, I extends EntityId> E findEntity(TenantId tenantId, EntityId entityId) {
|
||||
return switch (entityId.getEntityType()) {
|
||||
case TENANT, CUSTOMER, ASSET, DEVICE -> (E) entityService.fetchEntity(tenantId, entityId).orElse(null);
|
||||
|
||||
@ -43,7 +43,6 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
@ -103,7 +102,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
@ -729,13 +727,13 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, TbQueueCallback callback) {
|
||||
var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getEntityId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldDeleted(CalculatedField calculatedField, TbQueueCallback callback) {
|
||||
var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getEntityId(), ComponentLifecycleEvent.DELETED);
|
||||
var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED);
|
||||
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), callback);
|
||||
}
|
||||
|
||||
|
||||
@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BasicKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -51,7 +53,7 @@ public class ScriptCalculatedFieldStateTest {
|
||||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5512071d-5abc-411d-a907-4cdb6539c2eb"));
|
||||
private final AssetId ASSET_ID = new AssetId(UUID.fromString("5bc010ae-bcfd-46c8-98b9-8ee8c8955a76"));
|
||||
|
||||
private final SingleValueArgumentEntry assetHumidityArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 10, 43, 122L);
|
||||
private final SingleValueArgumentEntry assetHumidityArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 10, new LongDataEntry("assetHumidity", 43L), 122L);
|
||||
private final TsRollingArgumentEntry deviceTemperatureArgEntry = createRollingArgEntry();
|
||||
|
||||
private final long ts = System.currentTimeMillis();
|
||||
@ -65,6 +67,7 @@ public class ScriptCalculatedFieldStateTest {
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService);
|
||||
ctx.init();
|
||||
state = new ScriptCalculatedFieldState(ctx.getArgNames());
|
||||
}
|
||||
|
||||
@ -93,7 +96,7 @@ public class ScriptCalculatedFieldStateTest {
|
||||
void testUpdateStateWhenUpdateExistingEntry() {
|
||||
state.arguments = new HashMap<>(Map.of("deviceTemperature", deviceTemperatureArgEntry, "assetHumidity", assetHumidityArgEntry));
|
||||
|
||||
SingleValueArgumentEntry newArgEntry = new SingleValueArgumentEntry(ts, 41, 349L);
|
||||
SingleValueArgumentEntry newArgEntry = new SingleValueArgumentEntry(ts, new LongDataEntry("assetHumidity", 41L), 349L);
|
||||
Map<String, ArgumentEntry> newArgs = Map.of("assetHumidity", newArgEntry);
|
||||
boolean stateUpdated = state.updateState(newArgs);
|
||||
|
||||
@ -116,17 +119,17 @@ public class ScriptCalculatedFieldStateTest {
|
||||
Output output = getCalculatedFieldConfig().getOutput();
|
||||
assertThat(result.getType()).isEqualTo(output.getType());
|
||||
assertThat(result.getScope()).isEqualTo(output.getScope());
|
||||
assertThat(result.getResultMap()).isEqualTo(Map.of("averageDeviceTemperature", 13.0, "assetHumidity", 43));
|
||||
assertThat(result.getResultMap()).isEqualTo(Map.of("averageDeviceTemperature", 13.0, "assetHumidity", 43L));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPerformCalculationWhenOldTelemetry() throws ExecutionException, InterruptedException {
|
||||
TsRollingArgumentEntry argumentEntry = new TsRollingArgumentEntry();
|
||||
|
||||
TreeMap<Long, Object> values = new TreeMap<>();
|
||||
values.put(ts - 40000, 4);// will not be used for calculation
|
||||
values.put(ts - 45000, 2);// will not be used for calculation
|
||||
values.put(ts - 20, 0);
|
||||
TreeMap<Long, BasicKvEntry> values = new TreeMap<>();
|
||||
values.put(ts - 40000, new LongDataEntry("deviceTemperature", 4L));// will not be used for calculation
|
||||
values.put(ts - 45000, new LongDataEntry("deviceTemperature", 2L));// will not be used for calculation
|
||||
values.put(ts - 20, new LongDataEntry("deviceTemperature", 0L));
|
||||
|
||||
argumentEntry.setTsRecords(values);
|
||||
|
||||
@ -138,19 +141,19 @@ public class ScriptCalculatedFieldStateTest {
|
||||
Output output = getCalculatedFieldConfig().getOutput();
|
||||
assertThat(result.getType()).isEqualTo(output.getType());
|
||||
assertThat(result.getScope()).isEqualTo(output.getScope());
|
||||
assertThat(result.getResultMap()).isEqualTo(Map.of("averageDeviceTemperature", 0.0, "assetHumidity", 43));
|
||||
assertThat(result.getResultMap()).isEqualTo(Map.of("averageDeviceTemperature", 0.0, "assetHumidity", 43L));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPerformCalculationWhenArgumentsMoreThanLimit() throws ExecutionException, InterruptedException {
|
||||
TsRollingArgumentEntry argumentEntry = new TsRollingArgumentEntry();
|
||||
TreeMap<Long, Object> values = new TreeMap<>();
|
||||
values.put(ts - 20, 1000);// will not be used
|
||||
values.put(ts - 18, 0);
|
||||
values.put(ts - 16, 0);
|
||||
values.put(ts - 14, 0);
|
||||
values.put(ts - 12, 0);
|
||||
values.put(ts - 10, 0);
|
||||
TreeMap<Long, BasicKvEntry> values = new TreeMap<>();
|
||||
values.put(ts - 20, new LongDataEntry("deviceTemperature", 1000L));// will not be used
|
||||
values.put(ts - 18, new LongDataEntry("deviceTemperature", 0L));
|
||||
values.put(ts - 16, new LongDataEntry("deviceTemperature", 0L));
|
||||
values.put(ts - 14, new LongDataEntry("deviceTemperature", 0L));
|
||||
values.put(ts - 12, new LongDataEntry("deviceTemperature", 0L));
|
||||
values.put(ts - 10, new LongDataEntry("deviceTemperature", 0L));
|
||||
argumentEntry.setTsRecords(values);
|
||||
|
||||
state.arguments = new HashMap<>(Map.of("deviceTemperature", argumentEntry, "assetHumidity", assetHumidityArgEntry));
|
||||
@ -161,7 +164,7 @@ public class ScriptCalculatedFieldStateTest {
|
||||
Output output = getCalculatedFieldConfig().getOutput();
|
||||
assertThat(result.getType()).isEqualTo(output.getType());
|
||||
assertThat(result.getScope()).isEqualTo(output.getScope());
|
||||
assertThat(result.getResultMap()).isEqualTo(Map.of("averageDeviceTemperature", 0.0, "assetHumidity", 43));
|
||||
assertThat(result.getResultMap()).isEqualTo(Map.of("averageDeviceTemperature", 0.0, "assetHumidity", 43L));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -187,10 +190,10 @@ public class ScriptCalculatedFieldStateTest {
|
||||
TsRollingArgumentEntry argumentEntry = new TsRollingArgumentEntry();
|
||||
long ts = System.currentTimeMillis();
|
||||
|
||||
TreeMap<Long, Object> values = new TreeMap<>();
|
||||
values.put(ts - 40, 10);
|
||||
values.put(ts - 30, 12);
|
||||
values.put(ts - 20, 17);
|
||||
TreeMap<Long, BasicKvEntry> values = new TreeMap<>();
|
||||
values.put(ts - 40, new LongDataEntry("deviceTemperature", 10L));
|
||||
values.put(ts - 30, new LongDataEntry("deviceTemperature", 12L));
|
||||
values.put(ts - 20, new LongDataEntry("deviceTemperature", 17L));
|
||||
|
||||
argumentEntry.setTsRecords(values);
|
||||
return argumentEntry;
|
||||
|
||||
@ -30,6 +30,8 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -46,9 +48,9 @@ public class SimpleCalculatedFieldStateTest {
|
||||
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("5512071d-5abc-411d-a907-4cdb6539c2eb"));
|
||||
private final AssetId ASSET_ID = new AssetId(UUID.fromString("5bc010ae-bcfd-46c8-98b9-8ee8c8955a76"));
|
||||
|
||||
private final SingleValueArgumentEntry key1ArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 10, 11, 145L);
|
||||
private final SingleValueArgumentEntry key2ArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 6, 15, 165L);
|
||||
private final SingleValueArgumentEntry key3ArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 3, 23, 184L);
|
||||
private final SingleValueArgumentEntry key1ArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 10, new LongDataEntry("key1", 11L), 145L);
|
||||
private final SingleValueArgumentEntry key2ArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 6, new LongDataEntry("key2", 15L), 165L);
|
||||
private final SingleValueArgumentEntry key3ArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis() - 3, new LongDataEntry("key3", 23L), 184L);
|
||||
|
||||
private SimpleCalculatedFieldState state;
|
||||
private CalculatedFieldCtx ctx;
|
||||
@ -56,6 +58,7 @@ public class SimpleCalculatedFieldStateTest {
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ctx = new CalculatedFieldCtx(getCalculatedField(), null);
|
||||
ctx.init();
|
||||
state = new SimpleCalculatedFieldState(ctx.getArgNames());
|
||||
}
|
||||
|
||||
@ -88,7 +91,7 @@ public class SimpleCalculatedFieldStateTest {
|
||||
void testUpdateStateWhenUpdateExistingEntry() {
|
||||
state.arguments = new HashMap<>(Map.of("key1", key1ArgEntry));
|
||||
|
||||
SingleValueArgumentEntry newArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis(), 18, 190L);
|
||||
SingleValueArgumentEntry newArgEntry = new SingleValueArgumentEntry(System.currentTimeMillis(), new LongDataEntry("key1", 18L), 190L);
|
||||
Map<String, ArgumentEntry> newArgs = Map.of("key1", newArgEntry);
|
||||
boolean stateUpdated = state.updateState(newArgs);
|
||||
|
||||
@ -130,7 +133,7 @@ public class SimpleCalculatedFieldStateTest {
|
||||
void testPerformCalculationWhenPassedNotNumber() {
|
||||
state.arguments = new HashMap<>(Map.of(
|
||||
"key1", key1ArgEntry,
|
||||
"key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, "string", 124L),
|
||||
"key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new StringDataEntry("key2", "string"), 124L),
|
||||
"key3", key3ArgEntry
|
||||
));
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf.ctx.state;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
@ -29,7 +30,7 @@ public class SingleValueArgumentEntryTest {
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
entry = new SingleValueArgumentEntry(ts, 11, 363L);
|
||||
entry = new SingleValueArgumentEntry(ts, new LongDataEntry("key", 11L), 363L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -46,26 +47,26 @@ public class SingleValueArgumentEntryTest {
|
||||
|
||||
@Test
|
||||
void testUpdateEntryWithThaSameTs() {
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts, 13, 363L))).isFalse();
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts, new LongDataEntry("key", 13L), 363L))).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateEntryWhenNewVersionIsNull() {
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 16, 13, null))).isTrue();
|
||||
assertThat(entry.getValue()).isEqualTo(13);
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 16, new LongDataEntry("key", 13L), null))).isTrue();
|
||||
assertThat(entry.getValue()).isEqualTo(13L);
|
||||
assertThat(entry.getVersion()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateEntryWhenNewVersionIsGreaterThanCurrent() {
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, 18, 369L))).isTrue();
|
||||
assertThat(entry.getValue()).isEqualTo(18);
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 18L), 369L))).isTrue();
|
||||
assertThat(entry.getValue()).isEqualTo(18L);
|
||||
assertThat(entry.getVersion()).isEqualTo(369L);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testUpdateEntryWhenNewVersionIsLessThanCurrent() {
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, 18, 234L))).isFalse();
|
||||
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 18L), 234L))).isFalse();
|
||||
}
|
||||
|
||||
}
|
||||
@ -54,7 +54,7 @@ public class TsRollingArgumentEntryTest {
|
||||
|
||||
assertThat(entry.updateEntry(newEntry)).isTrue();
|
||||
assertThat(entry.getTsRecords()).hasSize(4);
|
||||
assertThat(entry.getTsRecords().get(ts - 10)).isEqualTo(23);
|
||||
assertThat(entry.getTsRecords().get(ts - 10).getValue()).isEqualTo(23.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -76,11 +76,11 @@ public class TsRollingArgumentEntryTest {
|
||||
assertThat(entry.updateEntry(newEntry)).isTrue();
|
||||
assertThat(entry.getTsRecords()).hasSize(5);
|
||||
assertThat(entry.getTsRecords()).isEqualTo(Map.of(
|
||||
ts - 40, 10,
|
||||
ts - 30, 12,
|
||||
ts - 20, 17,
|
||||
ts - 10, 7,
|
||||
ts - 5, 1
|
||||
ts - 40, new DoubleDataEntry("key", 10.0),
|
||||
ts - 30, new DoubleDataEntry("key", 12.0),
|
||||
ts - 20, new DoubleDataEntry("key", 17.0),
|
||||
ts - 10, new DoubleDataEntry("key", 7.0),
|
||||
ts - 5, new DoubleDataEntry("key", 1.0)
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
@ -135,6 +135,12 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
||||
|
||||
private double warnThreshold;
|
||||
|
||||
private long maxCalculatedFieldsPerTenant;
|
||||
private long maxCalculatedFieldsPerEntity;
|
||||
private long maxArgumentsPerCF;
|
||||
private long maxDataPointsPerRollingArg;
|
||||
private long maxStateSizeInKBytes;
|
||||
|
||||
@Override
|
||||
public long getProfileThreshold(ApiUsageRecordKey key) {
|
||||
return switch (key) {
|
||||
@ -175,6 +181,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
||||
case DASHBOARD -> maxDashboards;
|
||||
case RULE_CHAIN -> maxRuleChains;
|
||||
case EDGE -> maxEdges;
|
||||
case CALCULATED_FIELD -> maxCalculatedFieldsPerTenant;
|
||||
default -> 0;
|
||||
};
|
||||
}
|
||||
|
||||
@ -43,4 +43,6 @@ public interface CalculatedFieldDao extends Dao<CalculatedField> {
|
||||
|
||||
boolean existsByEntityId(TenantId tenantId, EntityId entityId);
|
||||
|
||||
long countCFByEntityId(TenantId tenantId, EntityId entityId);
|
||||
|
||||
}
|
||||
|
||||
@ -17,11 +17,15 @@ package org.thingsboard.server.dao.service.validator;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldDao;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.service.DataValidator;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
|
||||
@Component
|
||||
public class CalculatedFieldDataValidator extends DataValidator<CalculatedField> {
|
||||
@ -29,13 +33,40 @@ public class CalculatedFieldDataValidator extends DataValidator<CalculatedField>
|
||||
@Autowired
|
||||
private CalculatedFieldDao calculatedFieldDao;
|
||||
|
||||
@Autowired
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
@Override
|
||||
protected void validateCreate(TenantId tenantId, CalculatedField calculatedField) {
|
||||
validateNumberOfEntitiesPerTenant(tenantId, EntityType.CALCULATED_FIELD);
|
||||
validateNumberOfCFsPerEntity(tenantId, calculatedField.getEntityId());
|
||||
validateNumberOfArgumentsPerCF(tenantId, calculatedField);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CalculatedField validateUpdate(TenantId tenantId, CalculatedField calculatedField) {
|
||||
CalculatedField old = calculatedFieldDao.findById(calculatedField.getTenantId(), calculatedField.getId().getId());
|
||||
if (old == null) {
|
||||
throw new DataValidationException("Can't update non existing calculated field!");
|
||||
}
|
||||
validateNumberOfArgumentsPerCF(tenantId, calculatedField);
|
||||
return old;
|
||||
}
|
||||
|
||||
private void validateNumberOfCFsPerEntity(TenantId tenantId, EntityId entityId) {
|
||||
long maxCFsPerEntity = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxCalculatedFieldsPerEntity);
|
||||
long countCFByEntityId = calculatedFieldDao.countCFByEntityId(tenantId, entityId);
|
||||
|
||||
if (countCFByEntityId == maxCFsPerEntity) {
|
||||
throw new DataValidationException("Calculated fields per entity limit reached!");
|
||||
}
|
||||
}
|
||||
|
||||
private void validateNumberOfArgumentsPerCF(TenantId tenantId, CalculatedField calculatedField) {
|
||||
long maxArgumentsPerCF = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxArgumentsPerCF);
|
||||
if (calculatedField.getConfiguration().getArguments().size() > maxArgumentsPerCF) {
|
||||
throw new DataValidationException("Calculated field arguments limit reached!");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -38,4 +38,6 @@ public interface CalculatedFieldRepository extends JpaRepository<CalculatedField
|
||||
|
||||
List<CalculatedFieldEntity> removeAllByTenantIdAndEntityId(UUID tenantId, UUID entityId);
|
||||
|
||||
long countByTenantIdAndEntityId(UUID tenantId, UUID entityId);
|
||||
|
||||
}
|
||||
|
||||
@ -88,6 +88,11 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao<CalculatedFieldEntity,
|
||||
return calculatedFieldRepository.existsByTenantIdAndEntityId(tenantId.getId(), entityId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long countCFByEntityId(TenantId tenantId, EntityId entityId) {
|
||||
return calculatedFieldRepository.countByTenantIdAndEntityId(tenantId.getId(), entityId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<CalculatedFieldEntity> getEntityClass() {
|
||||
return CalculatedFieldEntity.class;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user