diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java index a166259256..095e5b18dc 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponent.java @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.scheduler; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardThreadFactory; @@ -26,14 +27,15 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +@Slf4j @Component public class DefaultSchedulerComponent implements SchedulerComponent { - protected ScheduledExecutorService schedulerExecutor; + private ScheduledExecutorService schedulerExecutor; @PostConstruct public void init() { - this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler")); + schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler")); } @PreDestroy @@ -52,10 +54,19 @@ public class DefaultSchedulerComponent implements SchedulerComponent { } public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return schedulerExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); + return schedulerExecutor.scheduleAtFixedRate(() -> runSafely(command), initialDelay, period, unit); } public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return schedulerExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + return schedulerExecutor.scheduleWithFixedDelay(() -> runSafely(command), initialDelay, delay, unit); } + + private void runSafely(Runnable command) { + try { + command.run(); + } catch (Throwable t) { + log.error("Unexpected error occurred while executing task!", t); + } + } + } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponentTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponentTest.java new file mode 100644 index 0000000000..cff306c748 --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponentTest.java @@ -0,0 +1,95 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.scheduler; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.assertj.core.api.Assertions.assertThat; + +class DefaultSchedulerComponentTest { + + DefaultSchedulerComponent schedulerComponent; + + @BeforeEach + void setup() { + schedulerComponent = new DefaultSchedulerComponent(); + schedulerComponent.init(); + } + + @AfterEach + void cleanup() { + schedulerComponent.destroy(); + } + + @Test + @DisplayName("scheduleAtFixedRate() should continue periodic execution even if command throws exception") + void scheduleAtFixedRateShouldNotStopPeriodicExecutionWhenCommandThrowsException() { + // GIVEN + var wasExecutedAtLeastOnce = new AtomicBoolean(false); + + Runnable exceptionThrowingCommand = () -> { + try { + throw new RuntimeException("Unexpected exception"); + } finally { + wasExecutedAtLeastOnce.set(true); + } + }; + + // WHEN + ScheduledFuture future = schedulerComponent.scheduleAtFixedRate(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS); + + // THEN + Awaitility.await().alias("Wait until command is executed at least once") + .atMost(5, TimeUnit.SECONDS) + .until(wasExecutedAtLeastOnce::get); + + assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse(); + } + + @Test + @DisplayName("scheduleWithFixedDelay() should continue periodic execution even if command throws exception") + void scheduleWithFixedDelayShouldNotStopPeriodicExecutionWhenCommandThrowsException() { + // GIVEN + var wasExecutedAtLeastOnce = new AtomicBoolean(false); + + Runnable exceptionThrowingCommand = () -> { + try { + throw new RuntimeException("Unexpected exception"); + } finally { + wasExecutedAtLeastOnce.set(true); + } + }; + + // WHEN + ScheduledFuture future = schedulerComponent.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS); + + // THEN + Awaitility.await().alias("Wait until command is executed at least once") + .atMost(5, TimeUnit.SECONDS) + .until(wasExecutedAtLeastOnce::get); + + assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse(); + } + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index f754d6f5b1..ac535d28e1 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -69,7 +69,7 @@ public abstract class AbstractActivityManager implements Activity log.error("Failed to process activity event: provided activity key is null."); return; } - log.debug("Received activity event for key: [{}]", key); + log.debug("Received activity event for key: [{}]. Event time: [{}].", key, newLastRecordedTime); var shouldReport = new AtomicBoolean(false); var lastRecordedTime = new AtomicLong(); @@ -94,7 +94,7 @@ public abstract class AbstractActivityManager implements Activity }); if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) { - log.debug("Going to report first activity event for key: [{}].", key); + log.debug("Going to report first activity event for key: [{}]. Event time: [{}].", key, lastRecordedTime.get()); reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { @@ -103,7 +103,7 @@ public abstract class AbstractActivityManager implements Activity @Override public void onFailure(Key key, Throwable t) { - log.debug("Failed to report first activity event for key: [{}].", key, t); + log.debug("Failed to report first activity event for key: [{}]. Event time: [{}].", key, lastRecordedTime.get(), t); } }); } @@ -113,50 +113,59 @@ public abstract class AbstractActivityManager implements Activity public void onReportingPeriodEnd() { log.debug("Going to end reporting period."); for (Map.Entry entry : states.entrySet()) { - var key = entry.getKey(); - var stateWrapper = entry.getValue(); - var currentState = stateWrapper.getState(); - - long lastRecordedTime = currentState.getLastRecordedTime(); - long lastReportedTime = stateWrapper.getLastReportedTime(); - var metadata = currentState.getMetadata(); - - boolean hasExpired; - boolean shouldReport; - - var updatedState = updateState(key, currentState); - if (updatedState != null) { - stateWrapper.setState(updatedState); - lastRecordedTime = updatedState.getLastRecordedTime(); - metadata = updatedState.getMetadata(); - hasExpired = hasExpired(lastRecordedTime); - shouldReport = stateWrapper.getStrategy().onReportingPeriodEnd(); - } else { - states.remove(key); - hasExpired = false; - shouldReport = true; + Key key = entry.getKey(); + ActivityStateWrapper stateWrapper = entry.getValue(); + try { + reportLastEvent(key, stateWrapper); + } catch (Exception e) { + log.error("Failed to report last activity event on reporting period end for key: [{}]. State: [{}].", key, stateWrapper, e); } + } + } - if (hasExpired) { - states.remove(key); - onStateExpiry(key, metadata); - shouldReport = true; - } + private void reportLastEvent(Key key, ActivityStateWrapper stateWrapper) { + var currentState = stateWrapper.getState(); - if (shouldReport && lastReportedTime < lastRecordedTime) { - log.debug("Going to report last activity event for key: [{}].", key); - reportActivity(key, metadata, lastRecordedTime, new ActivityReportCallback<>() { - @Override - public void onSuccess(Key key, long reportedTime) { - updateLastReportedTime(key, reportedTime); - } + long lastRecordedTime = currentState.getLastRecordedTime(); + long lastReportedTime = stateWrapper.getLastReportedTime(); + var metadata = currentState.getMetadata(); - @Override - public void onFailure(Key key, Throwable t) { - log.debug("Failed to report last activity event for key: [{}].", key, t); - } - }); - } + boolean hasExpired; + boolean shouldReport; + + var updatedState = updateState(key, currentState); + if (updatedState != null) { + stateWrapper.setState(updatedState); + lastRecordedTime = updatedState.getLastRecordedTime(); + metadata = updatedState.getMetadata(); + hasExpired = hasExpired(lastRecordedTime); + shouldReport = stateWrapper.getStrategy().onReportingPeriodEnd(); + } else { + states.remove(key); + hasExpired = false; + shouldReport = true; + } + + if (hasExpired) { + states.remove(key); + onStateExpiry(key, metadata); + shouldReport = true; + } + + if (shouldReport && lastReportedTime < lastRecordedTime) { + long timeToReport = lastRecordedTime; + log.debug("Going to report last activity event for key: [{}]. Event time: [{}].", key, timeToReport); + reportActivity(key, metadata, timeToReport, new ActivityReportCallback<>() { + @Override + public void onSuccess(Key key, long reportedTime) { + updateLastReportedTime(key, reportedTime); + } + + @Override + public void onFailure(Key key, Throwable t) { + log.debug("Failed to report last activity event for key: [{}]. Event time: [{}].", key, timeToReport, t); + } + }); } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstAndLastEventActivityStrategy.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstAndLastEventActivityStrategy.java index a5a59046ae..9ced9bd90b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstAndLastEventActivityStrategy.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstAndLastEventActivityStrategy.java @@ -16,7 +16,9 @@ package org.thingsboard.server.common.transport.activity.strategy; import lombok.EqualsAndHashCode; +import lombok.ToString; +@ToString @EqualsAndHashCode public final class FirstAndLastEventActivityStrategy implements ActivityStrategy { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstEventActivityStrategy.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstEventActivityStrategy.java index 5353635c45..3b5d9dfc8d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstEventActivityStrategy.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/strategy/FirstEventActivityStrategy.java @@ -16,7 +16,9 @@ package org.thingsboard.server.common.transport.activity.strategy; import lombok.EqualsAndHashCode; +import lombok.ToString; +@ToString @EqualsAndHashCode public final class FirstEventActivityStrategy implements ActivityStrategy {