Merge pull request #12592 from irynamatveieva/calculated-fields
Calculated fields
This commit is contained in:
commit
14a758f7da
@ -98,6 +98,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
import org.thingsboard.server.dao.widget.WidgetTypeService;
|
||||
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
@ -136,6 +137,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@ -178,6 +180,8 @@ public class ActorSystemContext {
|
||||
|
||||
private final ConcurrentMap<TenantId, DebugTbRateLimits> debugPerTenantLimits = new ConcurrentHashMap<>();
|
||||
|
||||
private final ConcurrentMap<TenantId, TbRateLimits> cfDebugPerTenantLimits = new ConcurrentHashMap<>();
|
||||
|
||||
public ConcurrentMap<TenantId, DebugTbRateLimits> getDebugPerTenantLimits() {
|
||||
return debugPerTenantLimits;
|
||||
}
|
||||
@ -440,6 +444,11 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private TbCoreToTransportService tbCoreToTransportService;
|
||||
|
||||
@Lazy
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
/**
|
||||
* The following Service will be null if we operate in tb-core mode
|
||||
*/
|
||||
@ -619,6 +628,14 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private String deviceStateNodeRateLimitConfig;
|
||||
|
||||
@Value("${actors.calculated_fields.debug_mode_rate_limits_per_tenant.enabled:true}")
|
||||
@Getter
|
||||
private boolean cfDebugPerTenantEnabled;
|
||||
|
||||
@Value("${actors.calculated_fields.debug_mode_rate_limits_per_tenant.configuration:50000:3600}")
|
||||
@Getter
|
||||
private String cfDebugPerTenantLimitsConfiguration;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private TbActorSystem actorSystem;
|
||||
@ -747,37 +764,6 @@ public class ActorSystemContext {
|
||||
}
|
||||
}
|
||||
|
||||
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
|
||||
try {
|
||||
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
|
||||
.tenantId(tenantId)
|
||||
.entityId(entityId.getId())
|
||||
.serviceId(getServiceId())
|
||||
.calculatedFieldId(calculatedFieldId)
|
||||
.eventEntity(entityId);
|
||||
if (tbMsgId != null) {
|
||||
eventBuilder.msgId(tbMsgId);
|
||||
}
|
||||
if (tbMsgType != null) {
|
||||
eventBuilder.msgType(tbMsgType.name());
|
||||
}
|
||||
if (arguments != null) {
|
||||
eventBuilder.arguments(JacksonUtil.toString(arguments));
|
||||
}
|
||||
if (result != null) {
|
||||
eventBuilder.result(result);
|
||||
}
|
||||
if (error != null) {
|
||||
eventBuilder.error(toString(error));
|
||||
}
|
||||
|
||||
ListenableFuture<Void> future = eventService.saveAsync(eventBuilder.build());
|
||||
Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
|
||||
} catch (IllegalArgumentException ex) {
|
||||
log.warn("Failed to persist calculated field debug message", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) {
|
||||
if (debugPerTenantEnabled) {
|
||||
DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id ->
|
||||
@ -811,6 +797,49 @@ public class ActorSystemContext {
|
||||
Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
|
||||
if (cfDebugPerTenantEnabled) {
|
||||
TbRateLimits rateLimits = cfDebugPerTenantLimits.computeIfAbsent(tenantId, id -> new TbRateLimits(cfDebugPerTenantLimitsConfiguration));
|
||||
|
||||
if (rateLimits.tryConsume()) {
|
||||
try {
|
||||
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
|
||||
.tenantId(tenantId)
|
||||
.entityId(calculatedFieldId.getId())
|
||||
.serviceId(getServiceId())
|
||||
.calculatedFieldId(calculatedFieldId)
|
||||
.eventEntity(entityId);
|
||||
if (tbMsgId != null) {
|
||||
eventBuilder.msgId(tbMsgId);
|
||||
}
|
||||
if (tbMsgType != null) {
|
||||
eventBuilder.msgType(tbMsgType.name());
|
||||
}
|
||||
if (arguments != null) {
|
||||
eventBuilder.arguments(JacksonUtil.toString(
|
||||
arguments.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getValue()))
|
||||
));
|
||||
}
|
||||
if (result != null) {
|
||||
eventBuilder.result(result);
|
||||
}
|
||||
if (error != null) {
|
||||
eventBuilder.error(toString(error));
|
||||
}
|
||||
|
||||
ListenableFuture<Void> future = eventService.saveAsync(eventBuilder.build());
|
||||
Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
|
||||
} catch (IllegalArgumentException ex) {
|
||||
log.warn("Failed to persist calculated field debug message", ex);
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}] Tenant level debug mode rate limit detected: {}", tenantId, calculatedFieldId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Exception toException(Throwable error) {
|
||||
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
|
||||
}
|
||||
|
||||
@ -200,7 +200,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
|
||||
@SneakyThrows
|
||||
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
|
||||
if (state.isReady()) {
|
||||
if (state.isReady() && ctx.isInitialized()) {
|
||||
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
|
||||
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
|
||||
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
|
||||
@ -209,7 +209,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
} else {
|
||||
callback.onSuccess(); // State was updated but no calculation performed;
|
||||
}
|
||||
cfService.pushStateToStorage(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback);
|
||||
cfService.pushStateToStorage(ctx, new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback);
|
||||
}
|
||||
|
||||
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) {
|
||||
|
||||
@ -95,7 +95,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
|
||||
public void onFieldInitMsg(CalculatedFieldInitMsg msg) {
|
||||
var cf = msg.getCf();
|
||||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService());
|
||||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
|
||||
try {
|
||||
cfCtx.init();
|
||||
} catch (Exception e) {
|
||||
@ -220,7 +220,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService());
|
||||
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
|
||||
try {
|
||||
cfCtx.init();
|
||||
} catch (Exception e) {
|
||||
@ -248,7 +248,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService());
|
||||
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
|
||||
calculatedFields.put(newCf.getId(), newCfCtx);
|
||||
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getId());
|
||||
List<CalculatedFieldCtx> newCfList = new ArrayList<>(oldCfList.size());
|
||||
|
||||
@ -43,7 +43,7 @@ public interface CalculatedFieldExecutionService {
|
||||
|
||||
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
|
||||
|
||||
void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
|
||||
void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
|
||||
|
||||
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
|
||||
|
||||
|
||||
@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
||||
|
||||
import java.util.Collections;
|
||||
@ -57,6 +58,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
private final AssetService assetService;
|
||||
private final DeviceService deviceService;
|
||||
private final TbelInvokeService tbelInvokeService;
|
||||
private final ApiLimitService apiLimitService;
|
||||
|
||||
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<EntityId, List<CalculatedField>> entityIdCalculatedFields = new ConcurrentHashMap<>();
|
||||
@ -116,7 +118,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
if (ctx == null) {
|
||||
CalculatedField calculatedField = getCalculatedField(calculatedFieldId);
|
||||
if (calculatedField != null) {
|
||||
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService);
|
||||
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService, apiLimitService);
|
||||
calculatedFieldsCtx.put(calculatedFieldId, ctx);
|
||||
log.debug("[{}] Put calculated field ctx into cache: {}", calculatedFieldId, ctx);
|
||||
}
|
||||
|
||||
@ -265,8 +265,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
stateService.persistState(stateId, state, callback);
|
||||
public void pushStateToStorage(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
stateService.persistState(ctx, stateId, state, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.service.cf.ctx;
|
||||
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
|
||||
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
|
||||
|
||||
import java.util.Map;
|
||||
@ -24,7 +25,7 @@ public interface CalculatedFieldStateService {
|
||||
|
||||
Map<CalculatedFieldEntityCtxId, CalculatedFieldState> restoreStates();
|
||||
|
||||
void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
|
||||
void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
|
||||
|
||||
void removeState(CalculatedFieldEntityCtxId stateId, TbCallback callback);
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.cf.ctx.state;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -23,6 +24,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
|
||||
|
||||
protected List<String> requiredArguments;
|
||||
@ -34,8 +36,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
|
||||
}
|
||||
|
||||
public BaseCalculatedFieldState() {
|
||||
this.requiredArguments = new ArrayList<>();
|
||||
this.arguments = new HashMap<>();
|
||||
this(new ArrayList<>(), new HashMap<>());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||
import org.thingsboard.server.common.data.util.TbPair;
|
||||
import org.thingsboard.server.common.util.ProtoUtils;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
|
||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
|
||||
|
||||
@ -67,7 +69,10 @@ public class CalculatedFieldCtx {
|
||||
|
||||
private boolean initialized;
|
||||
|
||||
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) {
|
||||
private long maxDataPointsPerRollingArg;
|
||||
private long maxStateSizeInKBytes;
|
||||
|
||||
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService) {
|
||||
this.calculatedField = calculatedField;
|
||||
|
||||
this.cfId = calculatedField.getId();
|
||||
@ -96,6 +101,9 @@ public class CalculatedFieldCtx {
|
||||
this.output = configuration.getOutput();
|
||||
this.expression = configuration.getExpression();
|
||||
this.tbelInvokeService = tbelInvokeService;
|
||||
|
||||
this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
|
||||
this.maxStateSizeInKBytes = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes);
|
||||
}
|
||||
|
||||
public void init() {
|
||||
|
||||
@ -59,8 +59,12 @@ public class RocksDBStateService implements CalculatedFieldStateService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
rocksDBService.put(toProto(stateId), toProto(stateId, state));
|
||||
public void persistState(CalculatedFieldCtx ctx, CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
|
||||
CalculatedFieldStateProto stateProto = toProto(stateId, state);
|
||||
long maxStateSizeInKBytes = ctx.getMaxStateSizeInKBytes();
|
||||
if (maxStateSizeInKBytes <= 0 || stateProto.getSerializedSize() <= ctx.getMaxStateSizeInKBytes()) {
|
||||
rocksDBService.put(toProto(stateId), toProto(stateId, state));
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
|
||||
@ -55,6 +55,7 @@ public class TenantAdminPermissions extends AbstractPermissions {
|
||||
put(Resource.OAUTH2_CONFIGURATION_TEMPLATE, new PermissionChecker.GenericPermissionChecker(Operation.READ));
|
||||
put(Resource.MOBILE_APP, tenantEntityPermissionChecker);
|
||||
put(Resource.MOBILE_APP_BUNDLE, tenantEntityPermissionChecker);
|
||||
put(Resource.CALCULATED_FIELD, tenantEntityPermissionChecker);
|
||||
}
|
||||
|
||||
public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {
|
||||
|
||||
@ -510,6 +510,12 @@ actors:
|
||||
js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}"
|
||||
# Actors statistic persistence frequency in milliseconds
|
||||
persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}"
|
||||
calculated_fields:
|
||||
debug_mode_rate_limits_per_tenant:
|
||||
# Enable/Disable the rate limit of persisted debug events for all calculated fields per tenant
|
||||
enabled: "${ACTORS_CF_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}"
|
||||
# The value of DEBUG mode rate limit. By default, no more than 50 thousand events per hour
|
||||
configuration: "${ACTORS_CF_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}"
|
||||
|
||||
debug:
|
||||
settings:
|
||||
|
||||
@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.thingsboard.script.api.tbel.DefaultTbelInvokeService;
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BasicKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -45,6 +47,8 @@ import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@SpringBootTest(classes = DefaultTbelInvokeService.class)
|
||||
public class ScriptCalculatedFieldStateTest {
|
||||
@ -64,9 +68,13 @@ public class ScriptCalculatedFieldStateTest {
|
||||
@Autowired
|
||||
private TbelInvokeService tbelInvokeService;
|
||||
|
||||
@MockBean
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService);
|
||||
when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
|
||||
ctx = new CalculatedFieldCtx(getCalculatedField(), tbelInvokeService, apiLimitService);
|
||||
ctx.init();
|
||||
state = new ScriptCalculatedFieldState(ctx.getArgNames());
|
||||
}
|
||||
@ -219,7 +227,7 @@ public class ScriptCalculatedFieldStateTest {
|
||||
ReferencedEntityKey refEntityKey1 = new ReferencedEntityKey("temperature", ArgumentType.TS_ROLLING, null);
|
||||
argument1.setRefEntityKey(refEntityKey1);
|
||||
argument1.setLimit(5);
|
||||
argument1.setTimeWindow(30000);
|
||||
argument1.setTimeWindow(30000L);
|
||||
|
||||
Argument argument2 = new Argument();
|
||||
ReferencedEntityKey refEntityKey2 = new ReferencedEntityKey("humidity", ArgumentType.TS_LATEST, null);
|
||||
|
||||
@ -17,6 +17,9 @@ package org.thingsboard.server.service.cf.ctx.state;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||
@ -32,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.dao.usagerecord.ApiLimitService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -41,7 +45,10 @@ import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class SimpleCalculatedFieldStateTest {
|
||||
|
||||
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("5b18e321-3327-4290-b996-d72a65e90382"));
|
||||
@ -55,9 +62,13 @@ public class SimpleCalculatedFieldStateTest {
|
||||
private SimpleCalculatedFieldState state;
|
||||
private CalculatedFieldCtx ctx;
|
||||
|
||||
@Mock
|
||||
private ApiLimitService apiLimitService;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ctx = new CalculatedFieldCtx(getCalculatedField(), null);
|
||||
when(apiLimitService.getLimit(any(), any())).thenReturn(1000L);
|
||||
ctx = new CalculatedFieldCtx(getCalculatedField(), null, apiLimitService);
|
||||
ctx.init();
|
||||
state = new SimpleCalculatedFieldState(ctx.getArgNames());
|
||||
}
|
||||
|
||||
@ -15,11 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.data.cf.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import lombok.Data;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
|
||||
@Data
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class Argument {
|
||||
|
||||
@Nullable
|
||||
@ -27,7 +29,7 @@ public class Argument {
|
||||
private ReferencedEntityKey refEntityKey;
|
||||
private String defaultValue;
|
||||
|
||||
private int limit;
|
||||
private long timeWindow;
|
||||
private Integer limit;
|
||||
private Long timeWindow;
|
||||
|
||||
}
|
||||
|
||||
@ -15,10 +15,12 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.data.cf.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
|
||||
@Data
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class Output {
|
||||
|
||||
private String name;
|
||||
|
||||
@ -15,18 +15,18 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.data.cf.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public class ReferencedEntityKey {
|
||||
|
||||
private String key;
|
||||
private ArgumentType type;
|
||||
private AttributeScope scope;
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -62,9 +62,9 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
|
||||
try {
|
||||
TenantId tenantId = calculatedField.getTenantId();
|
||||
log.trace("Executing save calculated field, [{}]", calculatedField);
|
||||
updateDebugSettings(tenantId, calculatedField, System.currentTimeMillis());
|
||||
CalculatedField savedCalculatedField = calculatedFieldDao.save(tenantId, calculatedField);
|
||||
createOrUpdateCalculatedFieldLink(tenantId, savedCalculatedField);
|
||||
updateDebugSettings(tenantId, calculatedField, System.currentTimeMillis());
|
||||
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCalculatedField.getTenantId()).entityId(savedCalculatedField.getId())
|
||||
.entity(savedCalculatedField).oldEntity(oldCalculatedField).created(calculatedField.getId() == null).build());
|
||||
return savedCalculatedField;
|
||||
|
||||
@ -35,7 +35,7 @@ public interface CalculatedFieldDebugEventRepository extends EventRepository<Cal
|
||||
List<CalculatedFieldDebugEventEntity> findLatestEvents(@Param("tenantId") UUID tenantId, @Param("entityId") UUID entityId, @Param("limit") int limit);
|
||||
|
||||
@Override
|
||||
@Query("SELECT e FROM RuleNodeDebugEventEntity e WHERE " +
|
||||
@Query("SELECT e FROM CalculatedFieldDebugEventEntity e WHERE " +
|
||||
"e.tenantId = :tenantId " +
|
||||
"AND e.entityId = :entityId " +
|
||||
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
|
||||
|
||||
@ -955,10 +955,10 @@ CREATE TABLE IF NOT EXISTS cf_debug_event (
|
||||
id uuid NOT NULL,
|
||||
tenant_id uuid NOT NULL ,
|
||||
ts bigint NOT NULL,
|
||||
entity_id uuid NOT NULL,
|
||||
entity_id uuid NOT NULL, -- calculated field id
|
||||
service_id varchar,
|
||||
cf_id uuid NOT NULL,
|
||||
e_entity_id uuid,
|
||||
e_entity_id uuid, -- target entity id
|
||||
e_entity_type varchar,
|
||||
e_msg_id uuid,
|
||||
e_msg_type varchar,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user