used limits for state persistence

This commit is contained in:
IrynaMatveieva 2025-02-04 17:03:25 +02:00
parent 5c63456ab1
commit d14d0d4e8a
15 changed files with 66 additions and 22 deletions

View File

@ -98,6 +98,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.dao.widget.WidgetsBundleService;
@ -516,6 +517,11 @@ public class ActorSystemContext {
@Getter @Getter
private CalculatedFieldExecutionService calculatedFieldExecutionService; private CalculatedFieldExecutionService calculatedFieldExecutionService;
@Lazy
@Autowired(required = false)
@Getter
private ApiLimitService apiLimitService;
@Value("${actors.session.max_concurrent_sessions_per_device:1}") @Value("${actors.session.max_concurrent_sessions_per_device:1}")
@Getter @Getter
private long maxConcurrentSessionsPerDevice; private long maxConcurrentSessionsPerDevice;

View File

@ -200,7 +200,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
@SneakyThrows @SneakyThrows
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) { private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
if (state.isReady()) { if (state.isReady() && ctx.isInitialized()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
@ -209,7 +209,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} else { } else {
callback.onSuccess(); // State was updated but no calculation performed; callback.onSuccess(); // State was updated but no calculation performed;
} }
cfService.pushStateToStorage(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback); cfService.pushStateToStorage(ctx, new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback);
} }
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) { private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) {

View File

@ -95,7 +95,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onFieldInitMsg(CalculatedFieldInitMsg msg) { public void onFieldInitMsg(CalculatedFieldInitMsg msg) {
var cf = msg.getCf(); var cf = msg.getCf();
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
try { try {
cfCtx.init(); cfCtx.init();
} catch (Exception e) { } catch (Exception e) {
@ -220,7 +220,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
callback.onSuccess(); callback.onSuccess();
} else { } else {
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
try { try {
cfCtx.init(); cfCtx.init();
} catch (Exception e) { } catch (Exception e) {
@ -248,7 +248,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
callback.onSuccess(); callback.onSuccess();
} else { } else {
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService()); var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
calculatedFields.put(newCf.getId(), newCfCtx); calculatedFields.put(newCf.getId(), newCfCtx);
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getId()); List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getId());
List<CalculatedFieldCtx> newCfList = new ArrayList<>(oldCfList.size()); List<CalculatedFieldCtx> newCfList = new ArrayList<>(oldCfList.size());

View File

@ -43,7 +43,7 @@ public interface CalculatedFieldExecutionService {
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback); void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.Collections; import java.util.Collections;
@ -57,6 +58,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
private final AssetService assetService; private final AssetService assetService;
private final DeviceService deviceService; private final DeviceService deviceService;
private final TbelInvokeService tbelInvokeService; private final TbelInvokeService tbelInvokeService;
private final ApiLimitService apiLimitService;
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, List<CalculatedField>> entityIdCalculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap<EntityId, List<CalculatedField>> entityIdCalculatedFields = new ConcurrentHashMap<>();
@ -116,7 +118,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
if (ctx == null) { if (ctx == null) {
CalculatedField calculatedField = getCalculatedField(calculatedFieldId); CalculatedField calculatedField = getCalculatedField(calculatedFieldId);
if (calculatedField != null) { if (calculatedField != null) {
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService); ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService, apiLimitService);
calculatedFieldsCtx.put(calculatedFieldId, ctx); calculatedFieldsCtx.put(calculatedFieldId, ctx);
log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx); log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx);
} }

View File

@ -265,8 +265,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
} }
@Override @Override
public void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { public void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
stateService.persistState(stateId, state, callback); stateService.persistState(ctx, stateId, state, callback);
} }
@Override @Override

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.cf.ctx; package org.thingsboard.server.service.cf.ctx;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.Map; import java.util.Map;
@ -24,7 +25,7 @@ public interface CalculatedFieldStateService {
Map<CalculatedFieldEntityCtxId, CalculatedFieldState> restoreStates(); Map<CalculatedFieldEntityCtxId, CalculatedFieldState> restoreStates();
void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback); void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback);

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.cf.ctx.state; package org.thingsboard.server.service.cf.ctx.state;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import java.util.ArrayList; import java.util.ArrayList;
@ -23,6 +24,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@Data @Data
@AllArgsConstructor
public abstract class BaseCalculatedFieldState implements CalculatedFieldState { public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected List<String> requiredArguments; protected List<String> requiredArguments;
@ -34,8 +36,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
} }
public BaseCalculatedFieldState() { public BaseCalculatedFieldState() {
this.requiredArguments = new ArrayList<>(); this(new ArrayList<>(), new HashMap<>());
this.arguments = new HashMap<>();
} }
@Override @Override

View File

@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
@ -67,7 +69,10 @@ public class CalculatedFieldCtx {
private boolean initialized; private boolean initialized;
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) { private long maxDataPointsPerRollingArg;
private long maxStateSizeInKBytes;
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService) {
this.calculatedField = calculatedField; this.calculatedField = calculatedField;
this.cfId = calculatedField.getId(); this.cfId = calculatedField.getId();
@ -96,6 +101,9 @@ public class CalculatedFieldCtx {
this.output = configuration.getOutput(); this.output = configuration.getOutput();
this.expression = configuration.getExpression(); this.expression = configuration.getExpression();
this.tbelInvokeService = tbelInvokeService; this.tbelInvokeService = tbelInvokeService;
this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
this.maxStateSizeInKBytes = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes);
} }
public void init() { public void init() {

View File

@ -59,8 +59,11 @@ public class RocksDBStateService implements CalculatedFieldStateService {
} }
@Override @Override
public void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
rocksDBService.put(toProto(stateId), toProto(stateId, state)); CalculatedFieldStateProto stateProto = toProto(stateId, state);
if (stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) {
rocksDBService.put(toProto(stateId), toProto(stateId, state));
}
callback.onSuccess(); callback.onSuccess();
} }

View File

@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.thingsboard.script.api.tbel.DefaultTbelInvokeService; import org.thingsboard.script.api.tbel.DefaultTbelInvokeService;
import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.data.kv.BasicKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap; import java.util.HashMap;
@ -45,6 +47,8 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@SpringBootTest(classes = DefaultTbelInvokeService.class) @SpringBootTest(classes = DefaultTbelInvokeService.class)
public class ScriptCalculatedFieldStateTest { public class ScriptCalculatedFieldStateTest {
@ -64,9 +68,13 @@ public class ScriptCalculatedFieldStateTest {
@Autowired @Autowired
private TbelInvokeService tbelInvokeService; private TbelInvokeService tbelInvokeService;
@MockBean
private ApiLimitService apiLimitService;
@BeforeEach @BeforeEach
void setUp() { void setUp() {
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService); when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService, apiLimitService);
ctx.init(); ctx.init();
state = new ScriptCalculatedFieldState(ctx.getArgNames()); state = new ScriptCalculatedFieldState(ctx.getArgNames());
} }
@ -219,7 +227,7 @@ public class ScriptCalculatedFieldStateTest {
ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("temperature", ArgumentType.TS_ROLLING, null); ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("temperature", ArgumentType.TS_ROLLING, null);
argument1.setRefEntityKey(refEntityKey1); argument1.setRefEntityKey(refEntityKey1);
argument1.setLimit(5); argument1.setLimit(5);
argument1.setTimeWindow(30000); argument1.setTimeWindow(30000L);
Argument argument2 = new Argument(); Argument argument2 = new Argument();
ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("humidity", ArgumentType.TS_LATEST, null); ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("humidity", ArgumentType.TS_LATEST, null);

View File

@ -17,6 +17,9 @@ package org.thingsboard.server.service.cf.ctx.state;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.CalculatedFieldType;
@ -32,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap; import java.util.HashMap;
@ -41,7 +45,10 @@ import java.util.concurrent.ExecutionException;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SimpleCalculatedFieldStateTest { public class SimpleCalculatedFieldStateTest {
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5b18e321-3327-4290-b996-d72a65e90382")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5b18e321-3327-4290-b996-d72a65e90382"));
@ -55,9 +62,13 @@ public class SimpleCalculatedFieldStateTest {
private SimpleCalculatedFieldState state; private SimpleCalculatedFieldState state;
private CalculatedFieldCtx ctx; private CalculatedFieldCtx ctx;
@Mock
private ApiLimitService apiLimitService;
@BeforeEach @BeforeEach
void setUp() { void setUp() {
ctx = new CalculatedFieldCtx(getCalculatedField(), null); when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
ctx = new CalculatedFieldCtx(getCalculatedField(), null, apiLimitService);
ctx.init(); ctx.init();
state = new SimpleCalculatedFieldState(ctx.getArgNames()); state = new SimpleCalculatedFieldState(ctx.getArgNames());
} }

View File

@ -15,11 +15,13 @@
*/ */
package org.thingsboard.server.common.data.cf.configuration; package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data; import lombok.Data;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@Data @Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Argument { public class Argument {
@Nullable @Nullable
@ -27,7 +29,7 @@ public class Argument {
private ReferencedEntityKey refEntityKey; private ReferencedEntityKey refEntityKey;
private String defaultValue; private String defaultValue;
private int limit; private Integer limit;
private long timeWindow; private Long timeWindow;
} }

View File

@ -15,10 +15,12 @@
*/ */
package org.thingsboard.server.common.data.cf.configuration; package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
@Data @Data
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Output { public class Output {
private String name; private String name;

View File

@ -15,18 +15,18 @@
*/ */
package org.thingsboard.server.common.data.cf.configuration; package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ReferencedEntityKey { public class ReferencedEntityKey {
private String key; private String key;
private ArgumentType type; private ArgumentType type;
private AttributeScope scope; private AttributeScope scope;
} }