diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 1c18a31f68..c15c87be1c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -127,6 +127,7 @@ import java.io.StringWriter; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Slf4j @@ -763,9 +764,9 @@ public class ActorSystemContext { appActor.tellWithHighPriority(tbActorMsg); } - public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { + public ScheduledFuture schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); - getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS); + return getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS); } public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java index 5aea88d622..bf50273814 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java @@ -29,6 +29,9 @@ import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; + /** * @author Andrew Shvayka */ @@ -41,6 +44,7 @@ public abstract class ComponentActor statsScheduledFuture = null; public ComponentActor(ActorSystemContext systemContext, TenantId tenantId, T id) { super(systemContext); @@ -75,7 +79,7 @@ public abstract class ComponentActor x.cancel(false)); + statsScheduledFuture = null; } catch (Exception e) { log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage()); logAndPersist("OnStop", e, true); diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java index 571d79d18c..a34a8ca537 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java @@ -21,6 +21,7 @@ import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.common.msg.TbActorMsg; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; @Slf4j public abstract class AbstractContextAwareMsgProcessor { @@ -36,8 +37,8 @@ public abstract class AbstractContextAwareMsgProcessor { return systemContext.getScheduler(); } - protected void schedulePeriodicMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs, long periodInMs) { - systemContext.schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs); + protected ScheduledFuture schedulePeriodicMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs, long periodInMs) { + return systemContext.schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs); } protected void scheduleMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs) { diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java index 298874cc51..c37b35400c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java @@ -27,6 +27,8 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.RuleNodeException; +import java.util.concurrent.ScheduledFuture; + @Slf4j public abstract class ComponentMsgProcessor extends AbstractContextAwareMsgProcessor { @@ -77,8 +79,8 @@ public abstract class ComponentMsgProcessor extends Abstract start(context); } - public void scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) { - schedulePeriodicMsgWithDelay(context, StatsPersistTick.INSTANCE, statsPersistFrequency, statsPersistFrequency); + public ScheduledFuture scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) { + return schedulePeriodicMsgWithDelay(context, StatsPersistTick.INSTANCE, statsPersistFrequency, statsPersistFrequency); } protected boolean checkMsgValid(TbMsg tbMsg) {