Improvements

This commit is contained in:
Andrii Shvaika 2025-02-18 14:42:13 +02:00
parent 3cdc933b67
commit be12e5b985
8 changed files with 145 additions and 34 deletions

View File

@ -805,7 +805,7 @@ public class ActorSystemContext {
Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
}
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) {
public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map<String, ArgumentEntry> arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, String errorMessage) {
if (checkLimits(tenantId)) {
try {
CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
@ -829,8 +829,8 @@ public class ActorSystemContext {
if (result != null) {
eventBuilder.result(result);
}
if (error != null) {
eventBuilder.error(error.getMessage());
if (errorMessage != null) {
eventBuilder.error(errorMessage);
}
ListenableFuture<Void> future = eventService.saveAsync(eventBuilder.build());

View File

@ -0,0 +1,70 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
@Slf4j
public abstract class AbstractCalculatedFieldActor extends ContextAwareActor {
protected final TenantId tenantId;
public AbstractCalculatedFieldActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.tenantId = tenantId;
}
@Override
protected boolean doProcess(TbActorMsg msg) {
if (msg instanceof ToCalculatedFieldSystemMsg cfm) {
Exception cause;
try {
return doProcessCfMsg(cfm);
} catch (CalculatedFieldException cfe) {
if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) {
String message;
if (cfe.getErrorMessage() != null) {
message = cfe.getErrorMessage();
} else if (cfe.getCause() != null) {
message = cfe.getCause().getMessage();
} else {
message = "N/A";
}
systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message);
}
cause = cfe.getCause();
} catch (Exception e) {
logProcessingException(e);
cause = e;
}
cfm.getCallback().onFailure(cause);
return true;
} else {
return false;
}
}
abstract void logProcessingException(Exception e);
abstract boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException;
}

View File

@ -19,19 +19,18 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
@Slf4j
public class CalculatedFieldEntityActor extends ContextAwareActor {
public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor {
private final CalculatedFieldEntityMessageProcessor processor;
CalculatedFieldEntityActor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) {
super(systemContext);
super(systemContext, tenantId);
this.processor = new CalculatedFieldEntityMessageProcessor(systemContext, tenantId, entityId);
}
@ -49,7 +48,7 @@ public class CalculatedFieldEntityActor extends ContextAwareActor {
}
@Override
protected boolean doProcess(TbActorMsg msg) {
protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException {
switch (msg.getMsgType()) {
case CF_PARTITIONS_CHANGE_MSG:
processor.process((CalculatedFieldPartitionChangeMsg) msg);
@ -75,4 +74,8 @@ public class CalculatedFieldEntityActor extends ContextAwareActor {
return true;
}
@Override
void logProcessingException(Exception e) {
log.warn("[{}][{}] Processing failure", tenantId, processor.entityId, e);
}
}

View File

@ -105,7 +105,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
public void process(EntityInitCalculatedFieldMsg msg) {
public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
var cfCtx = msg.getCtx();
if (msg.isForceReinit()) {
@ -116,11 +116,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
var cfState = getOrInitState(cfCtx);
processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback());
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(cfCtx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, cfCtx.getCfId(), entityId, null, null, null, null, e);
}
msg.getCallback().onFailure(e);
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build();
}
}
@ -139,7 +135,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
public void process(EntityCalculatedFieldTelemetryMsg msg) {
public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing CF telemetry msg.", msg.getEntityId());
var proto = msg.getProto();
var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size());
@ -154,7 +150,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) {
public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing CF link telemetry msg.", msg.getEntityId());
var proto = msg.getProto();
var ctx = msg.getCtx();
@ -173,14 +169,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e);
}
callback.onFailure(e);
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection<CalculatedFieldId> cfIds, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection<CalculatedFieldId> cfIds, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
try {
if (cfIds.contains(ctx.getCfId())) {
callback.onSuccess(CALLBACKS_PER_CF);
@ -194,10 +187,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
} catch (Exception e) {
if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e);
}
callback.onFailure(e);
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
}
@ -245,6 +235,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSizeInKBytes());
states.put(ctx.getCfId(), state);
}
return state;
@ -261,6 +252,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
}
} else {
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
callback.onSuccess(); // State was updated but no calculation performed;
}
cfStateService.persistState(ctxId, state, callback);

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Builder;
import lombok.Getter;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.Map;
import java.util.UUID;
@Getter
@Builder
public class CalculatedFieldException extends Exception {
private final CalculatedFieldCtx ctx;
private final EntityId eventEntity;
private final UUID msgId;
private final TbMsgType msgType;
private Map<String, ArgumentEntry> arguments;
private String errorMessage;
private Exception cause;
}

View File

@ -19,9 +19,8 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
@ -31,12 +30,12 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
* Created by ashvayka on 15.03.18.
*/
@Slf4j
public class CalculatedFieldManagerActor extends ContextAwareActor {
public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor {
private final CalculatedFieldManagerMessageProcessor processor;
public CalculatedFieldManagerActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
super(systemContext, tenantId);
this.processor = new CalculatedFieldManagerMessageProcessor(systemContext, tenantId);
}
@ -54,7 +53,7 @@ public class CalculatedFieldManagerActor extends ContextAwareActor {
}
@Override
protected boolean doProcess(TbActorMsg msg) {
protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException {
switch (msg.getMsgType()) {
case CF_PARTITIONS_CHANGE_MSG:
processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg);
@ -82,4 +81,10 @@ public class CalculatedFieldManagerActor extends ContextAwareActor {
}
return true;
}
@Override
void logProcessingException(Exception e) {
log.warn("[{}] Processing failure", tenantId, e);
}
}

View File

@ -103,7 +103,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} catch (Exception e) {
log.debug("[{}] Failed to initialize CF context.", cf.getId(), e);
if (DebugModeUtil.isDebugAllAvailable(cf)) {
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e);
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e.getMessage());
}
}
calculatedFields.put(cf.getId(), cfCtx);
@ -237,7 +237,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} catch (Exception e) {
log.debug("[{}] Failed to initialize CF context.", cf.getId(), e);
if (DebugModeUtil.isDebugAllAvailable(cf)) {
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e);
systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e.getMessage());
}
}
calculatedFields.put(cf.getId(), cfCtx);
@ -266,7 +266,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
newCfCtx.init();
} catch (Exception e) {
if (DebugModeUtil.isDebugAllAvailable(newCf)) {
systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e);
systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e.getMessage());
}
}
calculatedFields.put(newCf.getId(), newCfCtx);

View File

@ -75,7 +75,8 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
@Override
public void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize) {
if (maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) {
if (!stateTooLarge && maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) {
arguments.clear();
setStateTooLarge(true);
}
}