diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index b60f960498..1ed919e922 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -141,6 +141,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -856,9 +857,9 @@ public class ActorSystemContext { appActor.tellWithHighPriority(tbActorMsg); } - public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { + public ScheduledFuture schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); - getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS); + return getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS); } public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java index 5778f02663..7b390cd9f3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ComponentActor.java @@ -29,6 +29,9 @@ import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; + /** * @author Andrew Shvayka */ @@ -41,6 +44,7 @@ public abstract class ComponentActor statsScheduledFuture = null; public ComponentActor(ActorSystemContext systemContext, TenantId tenantId, T id) { super(systemContext); @@ -73,9 +77,9 @@ public abstract class ComponentActor x.cancel(false)); + statsScheduledFuture = null; } catch (Exception e) { log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage()); logAndPersist("OnStop", e, true); diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java index 838bf0ea36..17f432eca5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java @@ -21,6 +21,7 @@ import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.common.msg.TbActorMsg; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; @Slf4j public abstract class AbstractContextAwareMsgProcessor { @@ -36,8 +37,8 @@ public abstract class AbstractContextAwareMsgProcessor { return systemContext.getScheduler(); } - protected void schedulePeriodicMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs, long periodInMs) { - systemContext.schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs); + protected ScheduledFuture schedulePeriodicMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs, long periodInMs) { + return systemContext.schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs); } protected void scheduleMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs) { diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java index d48a8f7684..76e5ce7899 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java @@ -27,6 +27,8 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.RuleNodeException; +import java.util.concurrent.ScheduledFuture; + @Slf4j public abstract class ComponentMsgProcessor extends AbstractContextAwareMsgProcessor { @@ -77,8 +79,8 @@ public abstract class ComponentMsgProcessor extends Abstract start(context); } - public void scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) { - schedulePeriodicMsgWithDelay(context, StatsPersistTick.INSTANCE, statsPersistFrequency, statsPersistFrequency); + public ScheduledFuture scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) { + return schedulePeriodicMsgWithDelay(context, StatsPersistTick.INSTANCE, statsPersistFrequency, statsPersistFrequency); } protected boolean checkMsgValid(TbMsg tbMsg) { diff --git a/application/src/test/java/org/thingsboard/server/actors/service/ComponentActorTest.java b/application/src/test/java/org/thingsboard/server/actors/service/ComponentActorTest.java new file mode 100644 index 0000000000..f847ca7db7 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/actors/service/ComponentActorTest.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.service; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.BDDMockito; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.shared.ComponentMsgProcessor; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.msg.TbActorStopReason; + +import java.util.concurrent.ScheduledFuture; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; + +class ComponentActorTest { + ComponentActor componentActor; + + @BeforeEach + void setUp() { + componentActor = Mockito.mock(ComponentActor.class); + } + + @Test + void scheduleStatsPersistTickTest() { + Assertions.assertNull(componentActor.statsScheduledFuture); + ScheduledFuture statsScheduledFuture = Mockito.mock(ScheduledFuture.class); + ActorSystemContext systemContext = Mockito.mock(ActorSystemContext.class); + ReflectionTestUtils.setField(componentActor, "systemContext", systemContext); + ComponentMsgProcessor processor = Mockito.mock(ComponentMsgProcessor.class); + componentActor.processor = processor; + BDDMockito.willReturn(statsScheduledFuture).given(processor).scheduleStatsPersistTick(any(), anyLong()); + BDDMockito.willCallRealMethod().given(componentActor).scheduleStatsPersistTick(); + + componentActor.scheduleStatsPersistTick(); + + Assertions.assertNotNull(componentActor.statsScheduledFuture); + } + + @Test + void destroyTest() { + ScheduledFuture statsScheduledFuture = Mockito.mock(ScheduledFuture.class); + componentActor.statsScheduledFuture = statsScheduledFuture; + Assertions.assertNotNull(componentActor.statsScheduledFuture); + Throwable cause = new Throwable(); + EntityId id = Mockito.mock(EntityId.class); + ReflectionTestUtils.setField(componentActor, "id", id); + BDDMockito.willCallRealMethod().given(componentActor).destroy(any(), any()); + + componentActor.destroy(TbActorStopReason.STOPPED, cause); + + Mockito.verify(statsScheduledFuture).cancel(false); + Assertions.assertNull(componentActor.statsScheduledFuture); + } + +}