From d14d0d4e8a8f011df869fedd96c9a09964c60af4 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 4 Feb 2025 17:03:25 +0200 Subject: [PATCH 1/2] used limits for state persistence --- .../server/actors/ActorSystemContext.java | 6 ++++++ .../CalculatedFieldEntityMessageProcessor.java | 4 ++-- .../CalculatedFieldManagerMessageProcessor.java | 6 +++--- .../service/cf/CalculatedFieldExecutionService.java | 2 +- .../service/cf/DefaultCalculatedFieldCache.java | 4 +++- .../cf/DefaultCalculatedFieldExecutionService.java | 4 ++-- .../service/cf/ctx/CalculatedFieldStateService.java | 3 ++- .../cf/ctx/state/BaseCalculatedFieldState.java | 5 +++-- .../service/cf/ctx/state/CalculatedFieldCtx.java | 10 +++++++++- .../service/cf/ctx/state/RocksDBStateService.java | 7 +++++-- .../ctx/state/ScriptCalculatedFieldStateTest.java | 12 ++++++++++-- .../ctx/state/SimpleCalculatedFieldStateTest.java | 13 ++++++++++++- .../common/data/cf/configuration/Argument.java | 6 ++++-- .../server/common/data/cf/configuration/Output.java | 2 ++ .../data/cf/configuration/ReferencedEntityKey.java | 4 ++-- 15 files changed, 66 insertions(+), 22 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index daa6744fd9..49e2c9c46e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -98,6 +98,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; @@ -516,6 +517,11 @@ public class ActorSystemContext { @Getter private CalculatedFieldExecutionService calculatedFieldExecutionService; + @Lazy + @Autowired(required = false) + @Getter + private ApiLimitService apiLimitService; + @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter private long maxConcurrentSessionsPerDevice; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 9392e3c02a..f6bc5b2d63 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -200,7 +200,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM @SneakyThrows private void processStateIfReady(CalculatedFieldCtx ctx, List cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) { - if (state.isReady()) { + if (state.isReady() && ctx.isInitialized()) { CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { @@ -209,7 +209,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } else { callback.onSuccess(); // State was updated but no calculation performed; } - cfService.pushStateToStorage(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback); + cfService.pushStateToStorage(ctx, new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback); } private Map mapToArguments(CalculatedFieldCtx ctx, List data) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index ec86e640d4..31c8eee23c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -95,7 +95,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onFieldInitMsg(CalculatedFieldInitMsg msg) { var cf = msg.getCf(); - var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); + var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); try { cfCtx.init(); } catch (Exception e) { @@ -220,7 +220,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { - var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); + var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); try { cfCtx.init(); } catch (Exception e) { @@ -248,7 +248,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { - var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService()); + var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); calculatedFields.put(newCf.getId(), newCfCtx); List oldCfList = entityIdCalculatedFields.get(newCf.getId()); List newCfList = new ArrayList<>(oldCfList.size()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 19f60165cf..393fbd3ec2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -43,7 +43,7 @@ public interface CalculatedFieldExecutionService { void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); - void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); + void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 7e841a0cf8..c8f9a7e882 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import java.util.Collections; @@ -57,6 +58,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { private final AssetService assetService; private final DeviceService deviceService; private final TbelInvokeService tbelInvokeService; + private final ApiLimitService apiLimitService; private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFields = new ConcurrentHashMap<>(); @@ -116,7 +118,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { if (ctx == null) { CalculatedField calculatedField = getCalculatedField(calculatedFieldId); if (calculatedField != null) { - ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService); + ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService, apiLimitService); calculatedFieldsCtx.put(calculatedFieldId, ctx); log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index d8e42c916d..03c8f3de0c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -265,8 +265,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { - stateService.persistState(stateId, state, callback); + public void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { + stateService.persistState(ctx, stateId, state, callback); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java index c670e97580..e822d52767 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.cf.ctx; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.Map; @@ -24,7 +25,7 @@ public interface CalculatedFieldStateService { Map restoreStates(); - void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); + void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 21105a44cb..86d83a1f70 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import lombok.AllArgsConstructor; import lombok.Data; import java.util.ArrayList; @@ -23,6 +24,7 @@ import java.util.List; import java.util.Map; @Data +@AllArgsConstructor public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected List requiredArguments; @@ -34,8 +36,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { } public BaseCalculatedFieldState() { - this.requiredArguments = new ArrayList<>(); - this.arguments = new HashMap<>(); + this(new ArrayList<>(), new HashMap<>()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 4600628838..c483c6ab5d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; @@ -67,7 +69,10 @@ public class CalculatedFieldCtx { private boolean initialized; - public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) { + private long maxDataPointsPerRollingArg; + private long maxStateSizeInKBytes; + + public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService) { this.calculatedField = calculatedField; this.cfId = calculatedField.getId(); @@ -96,6 +101,9 @@ public class CalculatedFieldCtx { this.output = configuration.getOutput(); this.expression = configuration.getExpression(); this.tbelInvokeService = tbelInvokeService; + + this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg); + this.maxStateSizeInKBytes = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes); } public void init() { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java index 18e604118d..371751cc6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java @@ -59,8 +59,11 @@ public class RocksDBStateService implements CalculatedFieldStateService { } @Override - public void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { - rocksDBService.put(toProto(stateId), toProto(stateId, state)); + public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { + CalculatedFieldStateProto stateProto = toProto(stateId, state); + if (stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) { + rocksDBService.put(toProto(stateId), toProto(stateId, state)); + } callback.onSuccess(); } diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java index 42ff828dbd..bded0d058d 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; import org.thingsboard.script.api.tbel.DefaultTbelInvokeService; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.AttributeScope; @@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.HashMap; @@ -45,6 +47,8 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; @SpringBootTest(classes = DefaultTbelInvokeService.class) public class ScriptCalculatedFieldStateTest { @@ -64,9 +68,13 @@ public class ScriptCalculatedFieldStateTest { @Autowired private TbelInvokeService tbelInvokeService; + @MockBean + private ApiLimitService apiLimitService; + @BeforeEach void setUp() { - ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService); + when(apiLimitService.getLimit(any(), any())).thenReturn(1000L); + ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService, apiLimitService); ctx.init(); state = new ScriptCalculatedFieldState(ctx.getArgNames()); } @@ -219,7 +227,7 @@ public class ScriptCalculatedFieldStateTest { ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("temperature", ArgumentType.TS_ROLLING, null); argument1.setRefEntityKey(refEntityKey1); argument1.setLimit(5); - argument1.setTimeWindow(30000); + argument1.setTimeWindow(30000L); Argument argument2 = new Argument(); ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("humidity", ArgumentType.TS_LATEST, null); diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java index d6b384d85b..d803ad2dab 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java @@ -17,6 +17,9 @@ package org.thingsboard.server.service.cf.ctx.state; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; @@ -32,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.HashMap; @@ -41,7 +45,10 @@ import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) public class SimpleCalculatedFieldStateTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5b18e321-3327-4290-b996-d72a65e90382")); @@ -55,9 +62,13 @@ public class SimpleCalculatedFieldStateTest { private SimpleCalculatedFieldState state; private CalculatedFieldCtx ctx; + @Mock + private ApiLimitService apiLimitService; + @BeforeEach void setUp() { - ctx = new CalculatedFieldCtx(getCalculatedField(), null); + when(apiLimitService.getLimit(any(), any())).thenReturn(1000L); + ctx = new CalculatedFieldCtx(getCalculatedField(), null, apiLimitService); ctx.init(); state = new SimpleCalculatedFieldState(ctx.getArgNames()); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java index b61c3bc507..9923751db6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.common.data.cf.configuration; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import org.springframework.lang.Nullable; import org.thingsboard.server.common.data.id.EntityId; @Data +@JsonInclude(JsonInclude.Include.NON_NULL) public class Argument { @Nullable @@ -27,7 +29,7 @@ public class Argument { private ReferencedEntityKey refEntityKey; private String defaultValue; - private int limit; - private long timeWindow; + private Integer limit; + private Long timeWindow; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java index 12cf97338a..b57b19d3ef 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.common.data.cf.configuration; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; @Data +@JsonInclude(JsonInclude.Include.NON_NULL) public class Output { private String name; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java index b4bcc77a17..fd0bf3ceb7 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java @@ -15,18 +15,18 @@ */ package org.thingsboard.server.common.data.cf.configuration; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; @Data @AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) public class ReferencedEntityKey { private String key; private ArgumentType type; private AttributeScope scope; - - } From 3818d1cb68d35129e535d96f2ddb098c59812bdb Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 5 Feb 2025 12:34:10 +0200 Subject: [PATCH 2/2] debug events(wip) --- .../server/actors/ActorSystemContext.java | 95 ++++++++++++------- .../cf/ctx/state/RocksDBStateService.java | 3 +- .../permission/TenantAdminPermissions.java | 1 + .../src/main/resources/thingsboard.yml | 6 ++ .../dao/cf/BaseCalculatedFieldService.java | 2 +- .../CalculatedFieldDebugEventRepository.java | 2 +- .../main/resources/sql/schema-entities.sql | 4 +- 7 files changed, 72 insertions(+), 41 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 49e2c9c46e..652cdabf77 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -137,6 +137,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Slf4j @Component @@ -179,6 +180,8 @@ public class ActorSystemContext { private final ConcurrentMap debugPerTenantLimits = new ConcurrentHashMap<>(); + private final ConcurrentMap cfDebugPerTenantLimits = new ConcurrentHashMap<>(); + public ConcurrentMap getDebugPerTenantLimits() { return debugPerTenantLimits; } @@ -441,6 +444,11 @@ public class ActorSystemContext { @Getter private TbCoreToTransportService tbCoreToTransportService; + @Lazy + @Autowired(required = false) + @Getter + private ApiLimitService apiLimitService; + /** * The following Service will be null if we operate in tb-core mode */ @@ -517,11 +525,6 @@ public class ActorSystemContext { @Getter private CalculatedFieldExecutionService calculatedFieldExecutionService; - @Lazy - @Autowired(required = false) - @Getter - private ApiLimitService apiLimitService; - @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter private long maxConcurrentSessionsPerDevice; @@ -625,6 +628,14 @@ public class ActorSystemContext { @Getter private String deviceStateNodeRateLimitConfig; + @Value("${actors.calculated_fields.debug_mode_rate_limits_per_tenant.enabled:true}") + @Getter + private boolean cfDebugPerTenantEnabled; + + @Value("${actors.calculated_fields.debug_mode_rate_limits_per_tenant.configuration:50000:3600}") + @Getter + private String cfDebugPerTenantLimitsConfiguration; + @Getter @Setter private TbActorSystem actorSystem; @@ -753,37 +764,6 @@ public class ActorSystemContext { } } - public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) { - try { - CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder() - .tenantId(tenantId) - .entityId(entityId.getId()) - .serviceId(getServiceId()) - .calculatedFieldId(calculatedFieldId) - .eventEntity(entityId); - if (tbMsgId != null) { - eventBuilder.msgId(tbMsgId); - } - if (tbMsgType != null) { - eventBuilder.msgType(tbMsgType.name()); - } - if (arguments != null) { - eventBuilder.arguments(JacksonUtil.toString(arguments)); - } - if (result != null) { - eventBuilder.result(result); - } - if (error != null) { - eventBuilder.error(toString(error)); - } - - ListenableFuture future = eventService.saveAsync(eventBuilder.build()); - Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); - } catch (IllegalArgumentException ex) { - log.warn("Failed to persist calculated field debug message", ex); - } - } - private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) { if (debugPerTenantEnabled) { DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id -> @@ -817,6 +797,49 @@ public class ActorSystemContext { Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); } + public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) { + if (cfDebugPerTenantEnabled) { + TbRateLimits rateLimits = cfDebugPerTenantLimits.computeIfAbsent(tenantId, id -> new TbRateLimits(cfDebugPerTenantLimitsConfiguration)); + + if (rateLimits.tryConsume()) { + try { + CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder() + .tenantId(tenantId) + .entityId(calculatedFieldId.getId()) + .serviceId(getServiceId()) + .calculatedFieldId(calculatedFieldId) + .eventEntity(entityId); + if (tbMsgId != null) { + eventBuilder.msgId(tbMsgId); + } + if (tbMsgType != null) { + eventBuilder.msgType(tbMsgType.name()); + } + if (arguments != null) { + eventBuilder.arguments(JacksonUtil.toString( + arguments.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getValue())) + )); + } + if (result != null) { + eventBuilder.result(result); + } + if (error != null) { + eventBuilder.error(toString(error)); + } + + ListenableFuture future = eventService.saveAsync(eventBuilder.build()); + Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); + } catch (IllegalArgumentException ex) { + log.warn("Failed to persist calculated field debug message", ex); + } + if (log.isTraceEnabled()) { + log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, calculatedFieldId); + } + } + } + } + public static Exception toException(Throwable error) { return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java index 371751cc6f..8a6a5c9cb7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java @@ -61,7 +61,8 @@ public class RocksDBStateService implements CalculatedFieldStateService { @Override public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { CalculatedFieldStateProto stateProto = toProto(stateId, state); - if (stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) { + long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes(); + if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) { rocksDBService.put(toProto(stateId), toProto(stateId, state)); } callback.onSuccess(); diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java b/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java index 5b98a56f24..b0e35e57e4 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java @@ -55,6 +55,7 @@ public class TenantAdminPermissions extends AbstractPermissions { put(Resource.OAUTH2_CONFIGURATION_TEMPLATE, new PermissionChecker.GenericPermissionChecker(Operation.READ)); put(Resource.MOBILE_APP, tenantEntityPermissionChecker); put(Resource.MOBILE_APP_BUNDLE, tenantEntityPermissionChecker); + put(Resource.CALCULATED_FIELD, tenantEntityPermissionChecker); } public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 30a3e51d8c..197f05a5d5 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -510,6 +510,12 @@ actors: js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}" # Actors statistic persistence frequency in milliseconds persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" + calculated_fields: + debug_mode_rate_limits_per_tenant: + # Enable/Disable the rate limit of persisted debug events for all calculated fields per tenant + enabled: "${ACTORS_CF_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" + # The value of DEBUG mode rate limit. By default, no more than 50 thousand events per hour + configuration: "${ACTORS_CF_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}" debug: settings: diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index d37a4a53c1..b062a2de5e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -62,9 +62,9 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements try { TenantId tenantId = calculatedField.getTenantId(); log.trace("Executing save calculated field, [{}]", calculatedField); + updateDebugSettings(tenantId, calculatedField, System.currentTimeMillis()); CalculatedField savedCalculatedField = calculatedFieldDao.save(tenantId, calculatedField); createOrUpdateCalculatedFieldLink(tenantId, savedCalculatedField); - updateDebugSettings(tenantId, calculatedField, System.currentTimeMillis()); eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCalculatedField.getTenantId()).entityId(savedCalculatedField.getId()) .entity(savedCalculatedField).oldEntity(oldCalculatedField).created(calculatedField.getId() == null).build()); return savedCalculatedField; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/CalculatedFieldDebugEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/CalculatedFieldDebugEventRepository.java index c0bd21ca74..a759babcfb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/CalculatedFieldDebugEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/CalculatedFieldDebugEventRepository.java @@ -35,7 +35,7 @@ public interface CalculatedFieldDebugEventRepository extends EventRepository findLatestEvents(@Param("tenantId") UUID tenantId, @Param("entityId") UUID entityId, @Param("limit") int limit); @Override - @Query("SELECT e FROM RuleNodeDebugEventEntity e WHERE " + + @Query("SELECT e FROM CalculatedFieldDebugEventEntity e WHERE " + "e.tenantId = :tenantId " + "AND e.entityId = :entityId " + "AND (:startTime IS NULL OR e.ts >= :startTime) " + diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index a6d1e8800d..4b395a5768 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -955,10 +955,10 @@ CREATE TABLE IF NOT EXISTS cf_debug_event ( id uuid NOT NULL, tenant_id uuid NOT NULL , ts bigint NOT NULL, - entity_id uuid NOT NULL, + entity_id uuid NOT NULL, -- calculated field id service_id varchar, cf_id uuid NOT NULL, - e_entity_id uuid, + e_entity_id uuid, -- target entity id e_entity_type varchar, e_msg_id uuid, e_msg_type varchar,