added validation and fixed timeouts
This commit is contained in:
parent
f55967ee98
commit
d7f61f0789
@ -134,14 +134,20 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
||||
public void process(CalculatedFieldEntityDeleteMsg msg) {
|
||||
log.info("[{}] Processing CF entity delete msg.", msg.getEntityId());
|
||||
if (this.entityId.equals(msg.getEntityId())) {
|
||||
MultipleTbCallback multipleTbCallback = new MultipleTbCallback(states.size(), msg.getCallback());
|
||||
states.forEach((cfId, state) -> cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), multipleTbCallback));
|
||||
ctx.stop(ctx.getSelf());
|
||||
if (states.isEmpty()) {
|
||||
msg.getCallback().onSuccess();
|
||||
} else {
|
||||
MultipleTbCallback multipleTbCallback = new MultipleTbCallback(states.size(), msg.getCallback());
|
||||
states.forEach((cfId, state) -> cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), multipleTbCallback));
|
||||
ctx.stop(ctx.getSelf());
|
||||
}
|
||||
} else {
|
||||
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
|
||||
var state = states.remove(cfId);
|
||||
if (state != null) {
|
||||
cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), msg.getCallback());
|
||||
} else {
|
||||
msg.getCallback().onSuccess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,7 +199,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
if (fieldsCount > 0) {
|
||||
MultipleTbCallback multiCallback = new MultipleTbCallback(fieldsCount, callback);
|
||||
var entityId = msg.getEntityId();
|
||||
oldProfileCfs.forEach(ctx -> deleteCfForEntity(entityId, ctx.getCfId(), callback));
|
||||
oldProfileCfs.forEach(ctx -> deleteCfForEntity(entityId, ctx.getCfId(), multiCallback));
|
||||
newProfileCfs.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback));
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
@ -306,7 +306,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
||||
if (!entityIds.isEmpty()) {
|
||||
//TODO: no need to do this if we cache all created actors and know which one belong to us;
|
||||
var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
|
||||
entityIds.forEach(id -> deleteCfForEntity(entityId, cfId, multiCallback));
|
||||
entityIds.forEach(id -> deleteCfForEntity(id, cfId, multiCallback));
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.controller;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.media.ArraySchema;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
@ -160,7 +161,12 @@ public class TenantProfileController extends BaseController {
|
||||
" \"rpcTtlDays\": 0,\n" +
|
||||
" \"queueStatsTtlDays\": 0,\n" +
|
||||
" \"ruleEngineExceptionsTtlDays\": 0,\n" +
|
||||
" \"warnThreshold\": 0\n" +
|
||||
" \"warnThreshold\": 0,\n" +
|
||||
" \"maxCalculatedFieldsPerEntity\": 5,\n" +
|
||||
" \"maxArgumentsPerCF\": 10,\n" +
|
||||
" \"maxDataPointsPerRollingArg\": 1000,\n" +
|
||||
" \"maxStateSizeInKBytes\": 32,\n" +
|
||||
" \"maxSingleValueArgumentSizeInKBytes\": 2" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" \"default\": false\n" +
|
||||
@ -172,7 +178,7 @@ public class TenantProfileController extends BaseController {
|
||||
@RequestMapping(value = "/tenantProfile", method = RequestMethod.POST)
|
||||
@ResponseBody
|
||||
public TenantProfile saveTenantProfile(@Parameter(description = "A JSON value representing the tenant profile.")
|
||||
@RequestBody TenantProfile tenantProfile) throws ThingsboardException {
|
||||
@Valid @RequestBody TenantProfile tenantProfile) throws ThingsboardException {
|
||||
TenantProfile oldProfile;
|
||||
if (tenantProfile.getId() == null) {
|
||||
accessControlService.checkPermission(getCurrentUser(), Resource.TENANT_PROFILE, Operation.CREATE);
|
||||
|
||||
@ -273,7 +273,8 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
|
||||
long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow();
|
||||
long startTs = currentTime - timeWindow;
|
||||
long maxDataPoints = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
|
||||
int limit = argument.getLimit() == 0 ? (int) maxDataPoints : argument.getLimit();
|
||||
int argumentLimit = argument.getLimit();
|
||||
int limit = argumentLimit == 0 || argumentLimit > maxDataPoints ? (int) maxDataPoints : argument.getLimit();
|
||||
|
||||
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getRefEntityKey().getKey(), startTs, currentTime, 0, limit, Aggregation.NONE);
|
||||
ListenableFuture<List<TsKvEntry>> tsRollingFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
|
||||
|
||||
@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
@ -58,6 +59,7 @@ public class TenantProfile extends BaseData<TenantProfileId> implements HasName
|
||||
@Schema(description = "If enabled, will push all messages related to this tenant and processed by the rule engine into separate queue. " +
|
||||
"Useful for complex microservices deployments, to isolate processing of the data for specific tenants", example = "false")
|
||||
private boolean isolatedTbRuleEngine;
|
||||
@Valid
|
||||
@Schema(description = "Complex JSON object that contains profile settings: queue configs, max devices, max assets, rate limits, etc.")
|
||||
private transient TenantProfileData profileData;
|
||||
@JsonIgnore
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.common.data.tenant.profile;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.Min;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
@ -135,10 +136,16 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
|
||||
|
||||
private double warnThreshold;
|
||||
|
||||
@Schema(example = "5")
|
||||
private long maxCalculatedFieldsPerEntity = 5;
|
||||
@Schema(example = "10")
|
||||
private long maxArgumentsPerCF = 10;
|
||||
@Min(value = 0, message = "must be at least 0")
|
||||
@Schema(example = "1000")
|
||||
private long maxDataPointsPerRollingArg = 1000;
|
||||
@Schema(example = "32")
|
||||
private long maxStateSizeInKBytes = 32;
|
||||
@Schema(example = "2")
|
||||
private long maxSingleValueArgumentSizeInKBytes = 2;
|
||||
|
||||
@Override
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.common.data.tenant.profile;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.Data;
|
||||
|
||||
import java.io.Serializable;
|
||||
@ -27,6 +28,7 @@ public class TenantProfileData implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -3642550257035920976L;
|
||||
|
||||
@Valid
|
||||
@Schema(description = "Complex JSON object that contains profile settings: max devices, max assets, rate limits, etc.")
|
||||
private TenantProfileConfiguration configuration;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user