Merge branch 'hotfix/3.7' of github.com:thingsboard/thingsboard into master-hotfix-3.7
This commit is contained in:
commit
c4d0867635
@ -17,6 +17,7 @@ package org.thingsboard.server.queue.scheduler;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
|
||||
@ -26,14 +27,15 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DefaultSchedulerComponent implements SchedulerComponent {
|
||||
|
||||
protected ScheduledExecutorService schedulerExecutor;
|
||||
private ScheduledExecutorService schedulerExecutor;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler"));
|
||||
schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler"));
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
@ -52,10 +54,19 @@ public class DefaultSchedulerComponent implements SchedulerComponent {
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
return schedulerExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
|
||||
return schedulerExecutor.scheduleAtFixedRate(() -> runSafely(command), initialDelay, period, unit);
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
return schedulerExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
|
||||
return schedulerExecutor.scheduleWithFixedDelay(() -> runSafely(command), initialDelay, delay, unit);
|
||||
}
|
||||
|
||||
private void runSafely(Runnable command) {
|
||||
try {
|
||||
command.run();
|
||||
} catch (Throwable t) {
|
||||
log.error("Unexpected error occurred while executing task!", t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.queue.scheduler;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class DefaultSchedulerComponentTest {
|
||||
|
||||
DefaultSchedulerComponent schedulerComponent;
|
||||
|
||||
@BeforeEach
|
||||
void setup() {
|
||||
schedulerComponent = new DefaultSchedulerComponent();
|
||||
schedulerComponent.init();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void cleanup() {
|
||||
schedulerComponent.destroy();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("scheduleAtFixedRate() should continue periodic execution even if command throws exception")
|
||||
void scheduleAtFixedRateShouldNotStopPeriodicExecutionWhenCommandThrowsException() {
|
||||
// GIVEN
|
||||
var wasExecutedAtLeastOnce = new AtomicBoolean(false);
|
||||
|
||||
Runnable exceptionThrowingCommand = () -> {
|
||||
try {
|
||||
throw new RuntimeException("Unexpected exception");
|
||||
} finally {
|
||||
wasExecutedAtLeastOnce.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
// WHEN
|
||||
ScheduledFuture<?> future = schedulerComponent.scheduleAtFixedRate(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS);
|
||||
|
||||
// THEN
|
||||
Awaitility.await().alias("Wait until command is executed at least once")
|
||||
.atMost(5, TimeUnit.SECONDS)
|
||||
.until(wasExecutedAtLeastOnce::get);
|
||||
|
||||
assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("scheduleWithFixedDelay() should continue periodic execution even if command throws exception")
|
||||
void scheduleWithFixedDelayShouldNotStopPeriodicExecutionWhenCommandThrowsException() {
|
||||
// GIVEN
|
||||
var wasExecutedAtLeastOnce = new AtomicBoolean(false);
|
||||
|
||||
Runnable exceptionThrowingCommand = () -> {
|
||||
try {
|
||||
throw new RuntimeException("Unexpected exception");
|
||||
} finally {
|
||||
wasExecutedAtLeastOnce.set(true);
|
||||
}
|
||||
};
|
||||
|
||||
// WHEN
|
||||
ScheduledFuture<?> future = schedulerComponent.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS);
|
||||
|
||||
// THEN
|
||||
Awaitility.await().alias("Wait until command is executed at least once")
|
||||
.atMost(5, TimeUnit.SECONDS)
|
||||
.until(wasExecutedAtLeastOnce::get);
|
||||
|
||||
assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse();
|
||||
}
|
||||
|
||||
}
|
||||
@ -69,7 +69,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
log.error("Failed to process activity event: provided activity key is null.");
|
||||
return;
|
||||
}
|
||||
log.debug("Received activity event for key: [{}]", key);
|
||||
log.debug("Received activity event for key: [{}]. Event time: [{}].", key, newLastRecordedTime);
|
||||
|
||||
var shouldReport = new AtomicBoolean(false);
|
||||
var lastRecordedTime = new AtomicLong();
|
||||
@ -94,7 +94,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
});
|
||||
|
||||
if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) {
|
||||
log.debug("Going to report first activity event for key: [{}].", key);
|
||||
log.debug("Going to report first activity event for key: [{}]. Event time: [{}].", key, lastRecordedTime.get());
|
||||
reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Key key, long reportedTime) {
|
||||
@ -103,7 +103,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
|
||||
@Override
|
||||
public void onFailure(Key key, Throwable t) {
|
||||
log.debug("Failed to report first activity event for key: [{}].", key, t);
|
||||
log.debug("Failed to report first activity event for key: [{}]. Event time: [{}].", key, lastRecordedTime.get(), t);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -113,50 +113,59 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
|
||||
public void onReportingPeriodEnd() {
|
||||
log.debug("Going to end reporting period.");
|
||||
for (Map.Entry<Key, ActivityStateWrapper> entry : states.entrySet()) {
|
||||
var key = entry.getKey();
|
||||
var stateWrapper = entry.getValue();
|
||||
var currentState = stateWrapper.getState();
|
||||
|
||||
long lastRecordedTime = currentState.getLastRecordedTime();
|
||||
long lastReportedTime = stateWrapper.getLastReportedTime();
|
||||
var metadata = currentState.getMetadata();
|
||||
|
||||
boolean hasExpired;
|
||||
boolean shouldReport;
|
||||
|
||||
var updatedState = updateState(key, currentState);
|
||||
if (updatedState != null) {
|
||||
stateWrapper.setState(updatedState);
|
||||
lastRecordedTime = updatedState.getLastRecordedTime();
|
||||
metadata = updatedState.getMetadata();
|
||||
hasExpired = hasExpired(lastRecordedTime);
|
||||
shouldReport = stateWrapper.getStrategy().onReportingPeriodEnd();
|
||||
} else {
|
||||
states.remove(key);
|
||||
hasExpired = false;
|
||||
shouldReport = true;
|
||||
Key key = entry.getKey();
|
||||
ActivityStateWrapper stateWrapper = entry.getValue();
|
||||
try {
|
||||
reportLastEvent(key, stateWrapper);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to report last activity event on reporting period end for key: [{}]. State: [{}].", key, stateWrapper, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasExpired) {
|
||||
states.remove(key);
|
||||
onStateExpiry(key, metadata);
|
||||
shouldReport = true;
|
||||
}
|
||||
private void reportLastEvent(Key key, ActivityStateWrapper stateWrapper) {
|
||||
var currentState = stateWrapper.getState();
|
||||
|
||||
if (shouldReport && lastReportedTime < lastRecordedTime) {
|
||||
log.debug("Going to report last activity event for key: [{}].", key);
|
||||
reportActivity(key, metadata, lastRecordedTime, new ActivityReportCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Key key, long reportedTime) {
|
||||
updateLastReportedTime(key, reportedTime);
|
||||
}
|
||||
long lastRecordedTime = currentState.getLastRecordedTime();
|
||||
long lastReportedTime = stateWrapper.getLastReportedTime();
|
||||
var metadata = currentState.getMetadata();
|
||||
|
||||
@Override
|
||||
public void onFailure(Key key, Throwable t) {
|
||||
log.debug("Failed to report last activity event for key: [{}].", key, t);
|
||||
}
|
||||
});
|
||||
}
|
||||
boolean hasExpired;
|
||||
boolean shouldReport;
|
||||
|
||||
var updatedState = updateState(key, currentState);
|
||||
if (updatedState != null) {
|
||||
stateWrapper.setState(updatedState);
|
||||
lastRecordedTime = updatedState.getLastRecordedTime();
|
||||
metadata = updatedState.getMetadata();
|
||||
hasExpired = hasExpired(lastRecordedTime);
|
||||
shouldReport = stateWrapper.getStrategy().onReportingPeriodEnd();
|
||||
} else {
|
||||
states.remove(key);
|
||||
hasExpired = false;
|
||||
shouldReport = true;
|
||||
}
|
||||
|
||||
if (hasExpired) {
|
||||
states.remove(key);
|
||||
onStateExpiry(key, metadata);
|
||||
shouldReport = true;
|
||||
}
|
||||
|
||||
if (shouldReport && lastReportedTime < lastRecordedTime) {
|
||||
long timeToReport = lastRecordedTime;
|
||||
log.debug("Going to report last activity event for key: [{}]. Event time: [{}].", key, timeToReport);
|
||||
reportActivity(key, metadata, timeToReport, new ActivityReportCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Key key, long reportedTime) {
|
||||
updateLastReportedTime(key, reportedTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Key key, Throwable t) {
|
||||
log.debug("Failed to report last activity event for key: [{}]. Event time: [{}].", key, timeToReport, t);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -16,7 +16,9 @@
|
||||
package org.thingsboard.server.common.transport.activity.strategy;
|
||||
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public final class FirstAndLastEventActivityStrategy implements ActivityStrategy {
|
||||
|
||||
|
||||
@ -16,7 +16,9 @@
|
||||
package org.thingsboard.server.common.transport.activity.strategy;
|
||||
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
|
||||
@ToString
|
||||
@EqualsAndHashCode
|
||||
public final class FirstEventActivityStrategy implements ActivityStrategy {
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user