Add unrecoverable initialization error for actors. Fix timeouts on missing rule chain id in the rule chain input node
This commit is contained in:
parent
405548d44a
commit
649898566c
@ -35,6 +35,7 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
||||
import org.thingsboard.rule.engine.api.ScriptEngine;
|
||||
import org.thingsboard.rule.engine.api.SmsService;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
|
||||
import org.thingsboard.rule.engine.api.slack.SlackService;
|
||||
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
|
||||
@ -843,9 +844,9 @@ class DefaultTbContext implements TbContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkTenantEntity(EntityId entityId) {
|
||||
public void checkTenantEntity(EntityId entityId) throws TbNodeException {
|
||||
if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) {
|
||||
throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.");
|
||||
throw new TbNodeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.", true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ import org.thingsboard.server.actors.TbEntityActorId;
|
||||
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
|
||||
import org.thingsboard.server.actors.service.ContextAwareActor;
|
||||
import org.thingsboard.server.actors.service.DefaultActorService;
|
||||
import org.thingsboard.server.actors.shared.RuleChainErrorActor;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainType;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
|
||||
import java.util.function.Function;
|
||||
@ -86,7 +88,12 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
|
||||
() -> DefaultActorService.RULE_DISPATCHER_NAME,
|
||||
() -> {
|
||||
RuleChain ruleChain = provider.apply(ruleChainId);
|
||||
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
|
||||
if (ruleChain == null) {
|
||||
return new RuleChainErrorActor.ActorCreator(systemContext, tenantId,
|
||||
new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!"));
|
||||
} else {
|
||||
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,78 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.actors.shared;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.actors.TbActor;
|
||||
import org.thingsboard.server.actors.TbActorId;
|
||||
import org.thingsboard.server.actors.TbStringActorId;
|
||||
import org.thingsboard.server.actors.service.ContextAwareActor;
|
||||
import org.thingsboard.server.actors.service.ContextBasedCreator;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
|
||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
public class RuleChainErrorActor extends ContextAwareActor {
|
||||
|
||||
private final TenantId tenantId;
|
||||
private final RuleEngineException error;
|
||||
|
||||
private RuleChainErrorActor(ActorSystemContext systemContext, TenantId tenantId, RuleEngineException error) {
|
||||
super(systemContext);
|
||||
this.tenantId = tenantId;
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doProcess(TbActorMsg msg) {
|
||||
if (msg instanceof RuleChainAwareMsg) {
|
||||
log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg);
|
||||
var rcMsg = (RuleChainAwareMsg) msg;
|
||||
rcMsg.getMsg().getCallback().onFailure(error);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static class ActorCreator extends ContextBasedCreator {
|
||||
|
||||
private final TenantId tenantId;
|
||||
private final RuleEngineException error;
|
||||
|
||||
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) {
|
||||
super(context);
|
||||
this.tenantId = tenantId;
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbActorId createActorId() {
|
||||
return new TbStringActorId(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbActor createActor() {
|
||||
return new RuleChainErrorActor(context, tenantId, error);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -19,6 +19,7 @@ import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.msg.MsgType;
|
||||
import org.thingsboard.server.common.msg.TbActorError;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.TbActorStopReason;
|
||||
|
||||
@ -69,9 +70,14 @@ public final class TbActorMailbox implements TbActorCtx {
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
|
||||
InitFailureStrategy strategy;
|
||||
int attemptIdx = attempt + 1;
|
||||
InitFailureStrategy strategy = actor.onInitFailure(attempt, t);
|
||||
if (isUnrecoverable(t)) {
|
||||
strategy = InitFailureStrategy.stop();
|
||||
} else {
|
||||
log.debug("[{}] Failed to init actor, attempt: {}", selfId, attempt, t);
|
||||
strategy = actor.onInitFailure(attempt, t);
|
||||
}
|
||||
if (strategy.isStop() || (settings.getMaxActorInitAttempts() > 0 && attemptIdx > settings.getMaxActorInitAttempts())) {
|
||||
log.info("[{}] Failed to init actor, attempt {}, going to stop attempts.", selfId, attempt, t);
|
||||
stopReason = TbActorStopReason.INIT_FAILED;
|
||||
@ -88,6 +94,13 @@ public final class TbActorMailbox implements TbActorCtx {
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isUnrecoverable(Throwable t) {
|
||||
if (t instanceof TbActorException && t.getCause() != null) {
|
||||
t = t.getCause();
|
||||
}
|
||||
return t instanceof TbActorError && ((TbActorError) t).isUnrecoverable();
|
||||
}
|
||||
|
||||
private void enqueue(TbActorMsg msg, boolean highPriority) {
|
||||
if (!destroyInProgress.get()) {
|
||||
if (highPriority) {
|
||||
|
||||
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg;
|
||||
|
||||
public interface TbActorError {
|
||||
|
||||
boolean isUnrecoverable();
|
||||
|
||||
}
|
||||
@ -17,9 +17,12 @@ package org.thingsboard.server.common.msg.aware;
|
||||
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
public interface RuleChainAwareMsg extends TbActorMsg {
|
||||
|
||||
RuleChainId getRuleChainId();
|
||||
|
||||
TbMsg getMsg();
|
||||
|
||||
}
|
||||
|
||||
@ -210,7 +210,7 @@ public interface TbContext {
|
||||
|
||||
void schedule(Runnable runnable, long delay, TimeUnit timeUnit);
|
||||
|
||||
void checkTenantEntity(EntityId entityId);
|
||||
void checkTenantEntity(EntityId entityId) throws TbNodeException;
|
||||
|
||||
boolean isLocalEntity(EntityId entityId);
|
||||
|
||||
|
||||
@ -15,17 +15,33 @@
|
||||
*/
|
||||
package org.thingsboard.rule.engine.api;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.server.common.msg.TbActorError;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 19.01.18.
|
||||
*/
|
||||
public class TbNodeException extends Exception {
|
||||
public class TbNodeException extends Exception implements TbActorError {
|
||||
|
||||
@Getter
|
||||
private final boolean unrecoverable;
|
||||
|
||||
public TbNodeException(String message) {
|
||||
this(message, false);
|
||||
}
|
||||
|
||||
public TbNodeException(String message, boolean unrecoverable) {
|
||||
super(message);
|
||||
this.unrecoverable = unrecoverable;
|
||||
}
|
||||
|
||||
public TbNodeException(Exception e) {
|
||||
this(e, false);
|
||||
}
|
||||
|
||||
public TbNodeException(Exception e, boolean unrecoverable) {
|
||||
super(e);
|
||||
this.unrecoverable = unrecoverable;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ public class TbNodeUtils {
|
||||
try {
|
||||
return mapper.treeToValue(configuration.getData(), clazz);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new TbNodeException(e);
|
||||
throw new TbNodeException(e, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -67,7 +67,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
|
||||
if (!this.config.isDynamicSeverity()) {
|
||||
this.notDynamicAlarmSeverity = EnumUtils.getEnum(AlarmSeverity.class, this.config.getSeverity());
|
||||
if (this.notDynamicAlarmSeverity == null) {
|
||||
throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity());
|
||||
throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user