diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 1a5e619d2c..20f6107c06 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -805,7 +805,7 @@ public class ActorSystemContext { Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); } - public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, Throwable error) { + public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, String errorMessage) { if (checkLimits(tenantId)) { try { CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder() @@ -829,8 +829,8 @@ public class ActorSystemContext { if (result != null) { eventBuilder.result(result); } - if (error != null) { - eventBuilder.error(error.getMessage()); + if (errorMessage != null) { + eventBuilder.error(errorMessage); } ListenableFuture future = eventService.saveAsync(eventBuilder.build()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java new file mode 100644 index 0000000000..5c061e3613 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java @@ -0,0 +1,70 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.DebugModeUtil; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.service.ContextAwareActor; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; + +@Slf4j +public abstract class AbstractCalculatedFieldActor extends ContextAwareActor { + + protected final TenantId tenantId; + + public AbstractCalculatedFieldActor(ActorSystemContext systemContext, TenantId tenantId) { + super(systemContext); + this.tenantId = tenantId; + } + + @Override + protected boolean doProcess(TbActorMsg msg) { + if (msg instanceof ToCalculatedFieldSystemMsg cfm) { + Exception cause; + try { + return doProcessCfMsg(cfm); + } catch (CalculatedFieldException cfe) { + if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) { + String message; + if (cfe.getErrorMessage() != null) { + message = cfe.getErrorMessage(); + } else if (cfe.getCause() != null) { + message = cfe.getCause().getMessage(); + } else { + message = "N/A"; + } + systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message); + } + cause = cfe.getCause(); + } catch (Exception e) { + logProcessingException(e); + cause = e; + } + cfm.getCallback().onFailure(cause); + return true; + } else { + return false; + } + } + + abstract void logProcessingException(Exception e); + + abstract boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException; + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java index 5e4aded41f..e73af0ff8c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -19,19 +19,18 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorException; -import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; @Slf4j -public class CalculatedFieldEntityActor extends ContextAwareActor { +public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor { private final CalculatedFieldEntityMessageProcessor processor; CalculatedFieldEntityActor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) { - super(systemContext); + super(systemContext, tenantId); this.processor = new CalculatedFieldEntityMessageProcessor(systemContext, tenantId, entityId); } @@ -49,7 +48,7 @@ public class CalculatedFieldEntityActor extends ContextAwareActor { } @Override - protected boolean doProcess(TbActorMsg msg) { + protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException { switch (msg.getMsgType()) { case CF_PARTITIONS_CHANGE_MSG: processor.process((CalculatedFieldPartitionChangeMsg) msg); @@ -75,4 +74,8 @@ public class CalculatedFieldEntityActor extends ContextAwareActor { return true; } + @Override + void logProcessingException(Exception e) { + log.warn("[{}][{}] Processing failure", tenantId, processor.entityId, e); + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 34cf5695c1..7211ac0db9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -105,7 +105,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - public void process(EntityInitCalculatedFieldMsg msg) { + public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException { log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId()); var cfCtx = msg.getCtx(); if (msg.isForceReinit()) { @@ -116,11 +116,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM var cfState = getOrInitState(cfCtx); processStateIfReady(cfCtx, Collections.singletonList(cfCtx.getCfId()), cfState, null, null, msg.getCallback()); } catch (Exception e) { - - if (DebugModeUtil.isDebugFailuresAvailable(cfCtx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, cfCtx.getCfId(), entityId, null, null, null, null, e); - } - msg.getCallback().onFailure(e); + throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build(); } } @@ -139,7 +135,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - public void process(EntityCalculatedFieldTelemetryMsg msg) { + public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException { log.info("[{}] Processing CF telemetry msg.", msg.getEntityId()); var proto = msg.getProto(); var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size()); @@ -154,7 +150,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) { + public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException { log.info("[{}] Processing CF link telemetry msg.", msg.getEntityId()); var proto = msg.getProto(); var ctx = msg.getCtx(); @@ -173,14 +169,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } } catch (Exception e) { - if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e); - } - callback.onFailure(e); + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } } - private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection cfIds, List cfIdList, MultipleTbCallback callback) { + private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection cfIds, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { try { if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); @@ -194,10 +187,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } } catch (Exception e) { - if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, null, null, null, null, e); - } - callback.onFailure(e); + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } } @@ -245,6 +235,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, // but this will significantly complicate the code. state = stateFuture.get(1, TimeUnit.MINUTES); + state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSizeInKBytes()); states.put(ctx.getCfId(), state); } return state; @@ -261,6 +252,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); } } else { + state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes()); callback.onSuccess(); // State was updated but no calculation performed; } cfStateService.persistState(ctxId, state, callback); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java new file mode 100644 index 0000000000..a536f10ddd --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Builder; +import lombok.Getter; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +import java.util.Map; +import java.util.UUID; + +@Getter +@Builder +public class CalculatedFieldException extends Exception { + + private final CalculatedFieldCtx ctx; + private final EntityId eventEntity; + private final UUID msgId; + private final TbMsgType msgType; + private Map arguments; + private String errorMessage; + private Exception cause; + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index 497602c93b..03069c137d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -19,9 +19,8 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorException; -import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; @@ -31,12 +30,12 @@ import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; * Created by ashvayka on 15.03.18. */ @Slf4j -public class CalculatedFieldManagerActor extends ContextAwareActor { +public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { private final CalculatedFieldManagerMessageProcessor processor; public CalculatedFieldManagerActor(ActorSystemContext systemContext, TenantId tenantId) { - super(systemContext); + super(systemContext, tenantId); this.processor = new CalculatedFieldManagerMessageProcessor(systemContext, tenantId); } @@ -54,7 +53,7 @@ public class CalculatedFieldManagerActor extends ContextAwareActor { } @Override - protected boolean doProcess(TbActorMsg msg) { + protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException { switch (msg.getMsgType()) { case CF_PARTITIONS_CHANGE_MSG: processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg); @@ -82,4 +81,10 @@ public class CalculatedFieldManagerActor extends ContextAwareActor { } return true; } + + @Override + void logProcessingException(Exception e) { + log.warn("[{}] Processing failure", tenantId, e); + } + } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index c3aa378524..5e005935aa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -103,7 +103,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } catch (Exception e) { log.debug("[{}] Failed to initialize CF context.", cf.getId(), e); if (DebugModeUtil.isDebugAllAvailable(cf)) { - systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e); + systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e.getMessage()); } } calculatedFields.put(cf.getId(), cfCtx); @@ -237,7 +237,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } catch (Exception e) { log.debug("[{}] Failed to initialize CF context.", cf.getId(), e); if (DebugModeUtil.isDebugAllAvailable(cf)) { - systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e); + systemContext.persistCalculatedFieldDebugEvent(cf.getTenantId(), cf.getId(), cf.getEntityId(), null, null, null, null, e.getMessage()); } } calculatedFields.put(cf.getId(), cfCtx); @@ -266,7 +266,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware newCfCtx.init(); } catch (Exception e) { if (DebugModeUtil.isDebugAllAvailable(newCf)) { - systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e); + systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e.getMessage()); } } calculatedFields.put(newCf.getId(), newCfCtx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 5ea4707ff3..2b103b567e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -75,7 +75,8 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { @Override public void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize) { - if (maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) { + if (!stateTooLarge && maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) { + arguments.clear(); setStateTooLarge(true); } }