diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index daa6744fd9..49e2c9c46e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -98,6 +98,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; @@ -516,6 +517,11 @@ public class ActorSystemContext { @Getter private CalculatedFieldExecutionService calculatedFieldExecutionService; + @Lazy + @Autowired(required = false) + @Getter + private ApiLimitService apiLimitService; + @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter private long maxConcurrentSessionsPerDevice; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 9392e3c02a..f6bc5b2d63 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -200,7 +200,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM @SneakyThrows private void processStateIfReady(CalculatedFieldCtx ctx, List cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) { - if (state.isReady()) { + if (state.isReady() && ctx.isInitialized()) { CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { @@ -209,7 +209,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } else { callback.onSuccess(); // State was updated but no calculation performed; } - cfService.pushStateToStorage(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback); + cfService.pushStateToStorage(ctx, new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback); } private Map mapToArguments(CalculatedFieldCtx ctx, List data) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index ec86e640d4..31c8eee23c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -95,7 +95,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onFieldInitMsg(CalculatedFieldInitMsg msg) { var cf = msg.getCf(); - var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); + var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); try { cfCtx.init(); } catch (Exception e) { @@ -220,7 +220,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { - var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); + var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); try { cfCtx.init(); } catch (Exception e) { @@ -248,7 +248,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId); callback.onSuccess(); } else { - var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService()); + var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService()); calculatedFields.put(newCf.getId(), newCfCtx); List oldCfList = entityIdCalculatedFields.get(newCf.getId()); List newCfList = new ArrayList<>(oldCfList.size()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 19f60165cf..393fbd3ec2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -43,7 +43,7 @@ public interface CalculatedFieldExecutionService { void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); - void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); + void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 7e841a0cf8..c8f9a7e882 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import java.util.Collections; @@ -57,6 +58,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { private final AssetService assetService; private final DeviceService deviceService; private final TbelInvokeService tbelInvokeService; + private final ApiLimitService apiLimitService; private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFields = new ConcurrentHashMap<>(); @@ -116,7 +118,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { if (ctx == null) { CalculatedField calculatedField = getCalculatedField(calculatedFieldId); if (calculatedField != null) { - ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService); + ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService, apiLimitService); calculatedFieldsCtx.put(calculatedFieldId, ctx); log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index d8e42c916d..03c8f3de0c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -265,8 +265,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { - stateService.persistState(stateId, state, callback); + public void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { + stateService.persistState(ctx, stateId, state, callback); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java index c670e97580..e822d52767 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.cf.ctx; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.Map; @@ -24,7 +25,7 @@ public interface CalculatedFieldStateService { Map restoreStates(); - void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); + void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 21105a44cb..86d83a1f70 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import lombok.AllArgsConstructor; import lombok.Data; import java.util.ArrayList; @@ -23,6 +24,7 @@ import java.util.List; import java.util.Map; @Data +@AllArgsConstructor public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected List requiredArguments; @@ -34,8 +36,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { } public BaseCalculatedFieldState() { - this.requiredArguments = new ArrayList<>(); - this.arguments = new HashMap<>(); + this(new ArrayList<>(), new HashMap<>()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 4600628838..c483c6ab5d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; @@ -67,7 +69,10 @@ public class CalculatedFieldCtx { private boolean initialized; - public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) { + private long maxDataPointsPerRollingArg; + private long maxStateSizeInKBytes; + + public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService) { this.calculatedField = calculatedField; this.cfId = calculatedField.getId(); @@ -96,6 +101,9 @@ public class CalculatedFieldCtx { this.output = configuration.getOutput(); this.expression = configuration.getExpression(); this.tbelInvokeService = tbelInvokeService; + + this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg); + this.maxStateSizeInKBytes = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes); } public void init() { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java index 18e604118d..371751cc6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java @@ -59,8 +59,11 @@ public class RocksDBStateService implements CalculatedFieldStateService { } @Override - public void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { - rocksDBService.put(toProto(stateId), toProto(stateId, state)); + public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { + CalculatedFieldStateProto stateProto = toProto(stateId, state); + if (stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) { + rocksDBService.put(toProto(stateId), toProto(stateId, state)); + } callback.onSuccess(); } diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java index 42ff828dbd..bded0d058d 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldStateTest.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; import org.thingsboard.script.api.tbel.DefaultTbelInvokeService; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.AttributeScope; @@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.HashMap; @@ -45,6 +47,8 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; @SpringBootTest(classes = DefaultTbelInvokeService.class) public class ScriptCalculatedFieldStateTest { @@ -64,9 +68,13 @@ public class ScriptCalculatedFieldStateTest { @Autowired private TbelInvokeService tbelInvokeService; + @MockBean + private ApiLimitService apiLimitService; + @BeforeEach void setUp() { - ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService); + when(apiLimitService.getLimit(any(), any())).thenReturn(1000L); + ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService, apiLimitService); ctx.init(); state = new ScriptCalculatedFieldState(ctx.getArgNames()); } @@ -219,7 +227,7 @@ public class ScriptCalculatedFieldStateTest { ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("temperature", ArgumentType.TS_ROLLING, null); argument1.setRefEntityKey(refEntityKey1); argument1.setLimit(5); - argument1.setTimeWindow(30000); + argument1.setTimeWindow(30000L); Argument argument2 = new Argument(); ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("humidity", ArgumentType.TS_LATEST, null); diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java index d6b384d85b..d803ad2dab 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java @@ -17,6 +17,9 @@ package org.thingsboard.server.service.cf.ctx.state; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; @@ -32,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.HashMap; @@ -41,7 +45,10 @@ import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) public class SimpleCalculatedFieldStateTest { private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5b18e321-3327-4290-b996-d72a65e90382")); @@ -55,9 +62,13 @@ public class SimpleCalculatedFieldStateTest { private SimpleCalculatedFieldState state; private CalculatedFieldCtx ctx; + @Mock + private ApiLimitService apiLimitService; + @BeforeEach void setUp() { - ctx = new CalculatedFieldCtx(getCalculatedField(), null); + when(apiLimitService.getLimit(any(), any())).thenReturn(1000L); + ctx = new CalculatedFieldCtx(getCalculatedField(), null, apiLimitService); ctx.init(); state = new SimpleCalculatedFieldState(ctx.getArgNames()); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java index b61c3bc507..9923751db6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.common.data.cf.configuration; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import org.springframework.lang.Nullable; import org.thingsboard.server.common.data.id.EntityId; @Data +@JsonInclude(JsonInclude.Include.NON_NULL) public class Argument { @Nullable @@ -27,7 +29,7 @@ public class Argument { private ReferencedEntityKey refEntityKey; private String defaultValue; - private int limit; - private long timeWindow; + private Integer limit; + private Long timeWindow; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java index 12cf97338a..b57b19d3ef 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.common.data.cf.configuration; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; @Data +@JsonInclude(JsonInclude.Include.NON_NULL) public class Output { private String name; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java index b4bcc77a17..fd0bf3ceb7 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java @@ -15,18 +15,18 @@ */ package org.thingsboard.server.common.data.cf.configuration; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; @Data @AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) public class ReferencedEntityKey { private String key; private ArgumentType type; private AttributeScope scope; - - }