ThingsBoard Actor Init improvements

This commit is contained in:
Andrii Shvaika 2020-06-08 10:58:22 +03:00
parent cc4f746b1d
commit a6733c42c4
12 changed files with 139 additions and 18 deletions

View File

@ -20,6 +20,7 @@ import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@ -39,7 +40,7 @@ public class DeviceActor extends ContextAwareActor {
}
@Override
public void init(TbActorCtx ctx) {
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
try {
@ -47,6 +48,7 @@ public class DeviceActor extends ContextAwareActor {
log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
} catch (Exception e) {
log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
throw new TbActorException("Failed to initialize device actor", e);
}
}

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.actors.service;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.actors.stats.StatsPersistMsg;
import org.thingsboard.server.common.data.id.EntityId;
@ -48,13 +50,13 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
abstract protected P createProcessor(TbActorCtx ctx);
@Override
public void init(TbActorCtx ctx) {
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
this.processor = createProcessor(ctx);
initProcessor(ctx);
}
protected void initProcessor(TbActorCtx ctx) {
protected void initProcessor(TbActorCtx ctx) throws TbActorException {
try {
log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
processor.start(ctx);
@ -63,10 +65,10 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
scheduleStatsPersistTick();
}
} catch (Exception e) {
log.warn("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType());
log.warn("Error:", e);
log.debug("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType(), e);
logAndPersist("OnStart", e, true);
logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
throw new TbActorException("Failed to init actor", e);
}
}
@ -158,11 +160,11 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP
errorsOccurred++;
String componentName = processor != null ? processor.getComponentName() : "Unknown";
if (critical) {
log.warn("[{}][{}][{}] Failed to process method: {}", id, tenantId, componentName, method);
log.warn("Critical Error: ", e);
} else {
log.debug("[{}][{}][{}] Failed to process method: {}", id, tenantId, componentName, method);
log.debug("Debug Error: ", e);
log.debug("Critical Error: ", e);
} else {
log.trace("[{}][{}][{}] Failed to process method: {}", id, tenantId, componentName, method);
log.trace("Debug Error: ", e);
}
long ts = System.currentTimeMillis();
if (ts - lastPersistedErrorTs > getErrorPersistFrequency()) {

View File

@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbActorNotRegisteredException;
import org.thingsboard.server.actors.TbActorRef;
@ -62,7 +63,7 @@ public class TenantActor extends RuleChainManagerActor {
boolean cantFindTenant = false;
@Override
public void init(TbActorCtx ctx) {
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.info("[{}] Starting tenant actor.", tenantId);
try {
@ -93,6 +94,8 @@ public class TenantActor extends RuleChainManagerActor {
}
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
// TODO: throw this in 3.1?
// throw new TbActorException("Failed to init actor", e);
}
}

View File

@ -160,7 +160,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
});
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue()));
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
mainConsumer.commit();

View File

@ -126,7 +126,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
scriptIdToNameMap.put(scriptId, functionName);
return scriptId;
} catch (Exception e) {
log.warn("Failed to compile JS script: {}", e.getMessage(), e);
log.debug("Failed to compile JS script: {}", e.getMessage(), e);
throw new ExecutionException(e);
}
});

View File

@ -23,7 +23,7 @@ public abstract class AbstractTbActor implements TbActor {
protected TbActorCtx ctx;
@Override
public void init(TbActorCtx ctx) {
public void init(TbActorCtx ctx) throws TbActorException {
this.ctx = ctx;
}

View File

@ -23,14 +23,14 @@ public interface TbActor {
TbActorRef getActorRef();
default void init(TbActorCtx ctx) {
default void init(TbActorCtx ctx) throws TbActorException {
}
default void destroy() {
default void destroy() throws TbActorException {
}
default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
return InitFailureStrategy.retryWithDelay(5000);
return InitFailureStrategy.retryWithDelay(5000 * attempt);
}
default ProcessFailureStrategy onProcessFailure(Throwable t) {

View File

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
public class TbActorException extends Exception {
public TbActorException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -148,6 +148,25 @@ public class ActorSystemTest {
Assert.assertEquals(2, testCtx.getInvocationCount().get());
}
@Test
public void testFailedInit() throws InterruptedException {
actorSystem.createDispatcher(ROOT_DISPATCHER, Executors.newWorkStealingPool(parallelism));
ActorTestCtx testCtx1 = getActorTestCtx(1);
ActorTestCtx testCtx2 = getActorTestCtx(1);
TbActorRef actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new FailedToInitActor.FailedToInitActorCreator(
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx1, 1, 3000));
TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new FailedToInitActor.FailedToInitActorCreator(
new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2, 2, 1));
actorId1.tell(new IntTbActorMsg(42));
actorId2.tell(new IntTbActorMsg(42));
Assert.assertFalse(testCtx1.getLatch().await(2, TimeUnit.SECONDS));
Assert.assertTrue(testCtx2.getLatch().await(1, TimeUnit.SECONDS));
Assert.assertTrue(testCtx1.getLatch().await(3, TimeUnit.SECONDS));
}
public void testActorsAndMessages(int actorsCount, int msgNumber, int times) throws InterruptedException {
Random random = new Random();

View File

@ -0,0 +1,72 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FailedToInitActor extends TestRootActor {
int retryAttempts;
int retryDelay;
int attempts = 0;
public FailedToInitActor(TbActorId actorId, ActorTestCtx testCtx, int retryAttempts, int retryDelay) {
super(actorId, testCtx);
this.retryAttempts = retryAttempts;
this.retryDelay = retryDelay;
}
@Override
public void init(TbActorCtx ctx) throws TbActorException {
if (attempts < retryAttempts) {
attempts++;
throw new TbActorException("Test attempt", new RuntimeException());
} else {
super.init(ctx);
}
}
@Override
public InitFailureStrategy onInitFailure(int attempt, Throwable t) {
return InitFailureStrategy.retryWithDelay(retryDelay);
}
public static class FailedToInitActorCreator implements TbActorCreator {
private final TbActorId actorId;
private final ActorTestCtx testCtx;
private final int retryAttempts;
private final int retryDelay;
public FailedToInitActorCreator(TbActorId actorId, ActorTestCtx testCtx, int retryAttempts, int retryDelay) {
this.actorId = actorId;
this.testCtx = testCtx;
this.retryAttempts = retryAttempts;
this.retryDelay = retryDelay;
}
@Override
public TbActorId createActorId() {
return actorId;
}
@Override
public TbActor createActor() {
return new FailedToInitActor(actorId, testCtx, retryAttempts, retryDelay);
}
}
}

View File

@ -25,7 +25,7 @@ public class SlowInitActor extends TestRootActor {
}
@Override
public void init(TbActorCtx ctx) {
public void init(TbActorCtx ctx) throws TbActorException {
try {
Thread.sleep(500);
} catch (InterruptedException e) {

View File

@ -37,7 +37,7 @@ public class TestRootActor extends AbstractTbActor {
}
@Override
public void init(TbActorCtx ctx) {
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
initialized = true;
}