State storage improvements

This commit is contained in:
Andrii Shvaika 2025-02-18 16:55:28 +02:00
parent be12e5b985
commit 618d350791
6 changed files with 62 additions and 42 deletions

View File

@ -107,16 +107,20 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
var cfCtx = msg.getCtx();
var ctx = msg.getCtx();
if (msg.isForceReinit()) {
log.info("Force reinitialization of CF: [{}].", cfCtx.getCfId());
states.remove(cfCtx.getCfId());
log.info("Force reinitialization of CF: [{}].", ctx.getCfId());
states.remove(ctx.getCfId());
}
try {
var cfState = getOrInitState(cfCtx);
processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback());
var state = getOrInitState(ctx);
if (!state.isSizeExceedsLimit()) {
processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, msg.getCallback());
} else {
throw new RuntimeException(ctx.getSizeExceedsLimitMessage());
}
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build();
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
@ -214,6 +218,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
state = getOrInitState(ctx);
justRestored = true;
}
if (state.isSizeOk()) {
if (state.updateState(newArgValues) || justRestored) {
cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId());
@ -221,6 +226,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
} else {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build();
}
}
@SneakyThrows
@ -235,7 +243,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSizeInKBytes());
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize());
states.put(ctx.getCfId(), state);
}
return state;
@ -244,18 +252,29 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
@SneakyThrows
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
if (state.isReady() && ctx.isInitialized()) {
boolean stateSizeOk;
if (ctx.isInitialized() && state.isReady()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeOk = state.isSizeOk();
if (stateSizeOk) {
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
}
}
} else {
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeOk = state.isSizeOk();
if (stateSizeOk) {
callback.onSuccess(); // State was updated but no calculation performed;
}
}
if (stateSizeOk) {
cfStateService.persistState(ctxId, state, callback);
} else {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build();
}
}
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) {

View File

@ -94,17 +94,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
this.ctx = ctx;
}
public void onFieldInitMsg(CalculatedFieldInitMsg msg) {
public void onFieldInitMsg(CalculatedFieldInitMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing CF init message.", msg.getCf().getId());
var cf = msg.getCf();
var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
try {
cfCtx.init();
} catch (Exception e) {
log.debug("[{}] Failed to initialize CF context.", cf.getId(), e);
if (DebugModeUtil.isDebugAllAvailable(cf)) {
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e.getMessage());
}
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
}
calculatedFields.put(cf.getId(), cfCtx);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
@ -135,7 +132,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) {
public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException {
log.info("Processing entity lifecycle event: [{}] for entity: [{}]", msg.getData().getEvent(), msg.getData().getEntityId());
var entityType = msg.getData().getEntityId().getEntityType();
var event = msg.getData().getEvent();
@ -220,7 +217,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback));
}
private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) {
private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
if (calculatedFields.containsKey(cfId)) {
log.warn("[{}] CF was already initialized [{}]", tenantId, cfId);
@ -235,10 +232,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
try {
cfCtx.init();
} catch (Exception e) {
log.debug("[{}] Failed to initialize CF context.", cf.getId(), e);
if (DebugModeUtil.isDebugAllAvailable(cf)) {
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e.getMessage());
}
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
}
calculatedFields.put(cf.getId(), cfCtx);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
@ -250,7 +244,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private void onCfUpdated(ComponentLifecycleMsg msg, TbCallback callback) {
private void onCfUpdated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
var cfId = new CalculatedFieldId(msg.getEntityId().getId());
var oldCfCtx = calculatedFields.get(cfId);
if (oldCfCtx == null) {
@ -265,9 +259,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
try {
newCfCtx.init();
} catch (Exception e) {
if (DebugModeUtil.isDebugAllAvailable(newCf)) {
systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e.getMessage());
}
throw CalculatedFieldException.builder().ctx(newCfCtx).eventEntity(newCfCtx.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
}
calculatedFields.put(newCf.getId(), newCfCtx);
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());

View File

@ -34,7 +34,7 @@ public abstract class AbstractCalculatedFieldStateService implements CalculatedF
@Override
public final void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
if (state.isStateTooLarge()) {
if (state.isSizeExceedsLimit()) {
throw new CalculatedFieldStateException("State size exceeds the maximum allowed limit. The state will not be persisted to RocksDB.");
}
doPersist(stateId, toProto(stateId, state), callback);

View File

@ -31,7 +31,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected List<String> requiredArguments;
protected Map<String, ArgumentEntry> arguments;
protected boolean stateTooLarge;
protected boolean sizeExceedsLimit;
public BaseCalculatedFieldState(List<String> requiredArguments) {
this.requiredArguments = requiredArguments;
@ -75,9 +75,9 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
@Override
public void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize) {
if (!stateTooLarge && maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) {
if (!sizeExceedsLimit && maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) {
arguments.clear();
setStateTooLarge(true);
sizeExceedsLimit = true;
}
}

View File

@ -70,7 +70,7 @@ public class CalculatedFieldCtx {
private boolean initialized;
private long maxDataPointsPerRollingArg;
private long maxStateSizeInKBytes;
private long maxStateSize;
public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService, ApiLimitService apiLimitService) {
this.calculatedField = calculatedField;
@ -103,7 +103,7 @@ public class CalculatedFieldCtx {
this.tbelInvokeService = tbelInvokeService;
this.maxDataPointsPerRollingArg = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxDataPointsPerRollingArg);
this.maxStateSizeInKBytes = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes);
this.maxStateSize = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxStateSizeInKBytes) * 1024;
}
public void init() {
@ -224,4 +224,8 @@ public class CalculatedFieldCtx {
return typeChanged || argumentsChanged;
}
public String getSizeExceedsLimitMessage() {
return "Failed to init CF state. State size exceeds limit of " + (maxStateSize / 1024) + "Kb!";
}
}

View File

@ -51,7 +51,12 @@ public interface CalculatedFieldState {
@JsonIgnore
boolean isReady();
boolean isStateTooLarge();
boolean isSizeExceedsLimit();
@JsonIgnore
default boolean isSizeOk() {
return !isSizeExceedsLimit();
}
void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize);