Merge pull request #12175 from thingsboard/tmp-merge
rc + license:format
This commit is contained in:
commit
aff3f6ee44
@ -117,9 +117,10 @@ public class HousekeeperService {
|
|||||||
throw new IllegalArgumentException("Unsupported task type " + taskType);
|
throw new IllegalArgumentException("Unsupported task type " + taskType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<Object> future = null;
|
||||||
try {
|
try {
|
||||||
long startTs = System.currentTimeMillis();
|
long startTs = System.currentTimeMillis();
|
||||||
Future<Object> future = taskExecutor.submit(() -> {
|
future = taskExecutor.submit(() -> {
|
||||||
taskProcessor.process((T) task);
|
taskProcessor.process((T) task);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
@ -137,7 +138,7 @@ public class HousekeeperService {
|
|||||||
if (e instanceof ExecutionException) {
|
if (e instanceof ExecutionException) {
|
||||||
error = e.getCause();
|
error = e.getCause();
|
||||||
} else if (e instanceof TimeoutException) {
|
} else if (e instanceof TimeoutException) {
|
||||||
error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " seconds");
|
error = new TimeoutException("Timeout after " + config.getTaskProcessingTimeout() + " ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) {
|
if (msg.getTask().getAttempt() < config.getMaxReprocessingAttempts()) {
|
||||||
@ -153,6 +154,10 @@ public class HousekeeperService {
|
|||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
statsService.ifPresent(statsService -> statsService.reportFailure(taskType, msg));
|
statsService.ifPresent(statsService -> statsService.reportFailure(taskType, msg));
|
||||||
|
} finally {
|
||||||
|
if (future != null && !future.isDone()) {
|
||||||
|
future.cancel(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -20,6 +20,8 @@ import org.thingsboard.server.common.data.housekeeper.HousekeeperTask;
|
|||||||
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
|
import org.thingsboard.server.common.data.housekeeper.HousekeeperTaskType;
|
||||||
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
|
import org.thingsboard.server.common.msg.housekeeper.HousekeeperClient;
|
||||||
|
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
public abstract class HousekeeperTaskProcessor<T extends HousekeeperTask> {
|
public abstract class HousekeeperTaskProcessor<T extends HousekeeperTask> {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -29,4 +31,13 @@ public abstract class HousekeeperTaskProcessor<T extends HousekeeperTask> {
|
|||||||
|
|
||||||
public abstract HousekeeperTaskType getTaskType();
|
public abstract HousekeeperTaskType getTaskType();
|
||||||
|
|
||||||
|
public <V> V wait(Future<V> future) throws Exception {
|
||||||
|
try {
|
||||||
|
return future.get(); // will be interrupted after taskProcessingTimeout
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
future.cancel(true); // interrupting the underlying task
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -33,7 +33,7 @@ public class LatestTsDeletionTaskProcessor extends HousekeeperTaskProcessor<Late
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(LatestTsDeletionHousekeeperTask task) throws Exception {
|
public void process(LatestTsDeletionHousekeeperTask task) throws Exception {
|
||||||
timeseriesService.removeLatest(task.getTenantId(), task.getEntityId(), List.of(task.getKey())).get();
|
wait(timeseriesService.removeLatest(task.getTenantId(), task.getEntityId(), List.of(task.getKey())));
|
||||||
log.debug("[{}][{}][{}] Deleted latest telemetry for key '{}'", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getKey());
|
log.debug("[{}][{}][{}] Deleted latest telemetry for key '{}'", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -36,7 +36,7 @@ public class TsHistoryDeletionTaskProcessor extends HousekeeperTaskProcessor<TsH
|
|||||||
@Override
|
@Override
|
||||||
public void process(TsHistoryDeletionHousekeeperTask task) throws Exception {
|
public void process(TsHistoryDeletionHousekeeperTask task) throws Exception {
|
||||||
DeleteTsKvQuery deleteQuery = new BaseDeleteTsKvQuery(task.getKey(), 0, System.currentTimeMillis(), false, false);
|
DeleteTsKvQuery deleteQuery = new BaseDeleteTsKvQuery(task.getKey(), 0, System.currentTimeMillis(), false, false);
|
||||||
timeseriesService.remove(task.getTenantId(), task.getEntityId(), List.of(deleteQuery)).get();
|
wait(timeseriesService.remove(task.getTenantId(), task.getEntityId(), List.of(deleteQuery)));
|
||||||
log.debug("[{}][{}][{}] Deleted timeseries history for key '{}'", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getKey());
|
log.debug("[{}][{}][{}] Deleted timeseries history for key '{}'", task.getTenantId(), task.getEntityId().getEntityType(), task.getEntityId(), task.getKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -93,8 +93,12 @@ import java.util.Arrays;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -103,6 +107,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.doCallRealMethod;
|
import static org.mockito.Mockito.doCallRealMethod;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
@ -113,7 +118,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
|
|||||||
@TestPropertySource(properties = {
|
@TestPropertySource(properties = {
|
||||||
"queue.core.housekeeper.task-reprocessing-delay-ms=2000",
|
"queue.core.housekeeper.task-reprocessing-delay-ms=2000",
|
||||||
"queue.core.housekeeper.poll-interval-ms=1000",
|
"queue.core.housekeeper.poll-interval-ms=1000",
|
||||||
"queue.core.housekeeper.max-reprocessing-attempts=5"
|
"queue.core.housekeeper.max-reprocessing-attempts=5",
|
||||||
|
"queue.core.housekeeper.task-processing-timeout-ms=5000",
|
||||||
})
|
})
|
||||||
public class HousekeeperServiceTest extends AbstractControllerTest {
|
public class HousekeeperServiceTest extends AbstractControllerTest {
|
||||||
|
|
||||||
@ -320,7 +326,7 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void whenTaskProcessingFails_thenReprocess() throws Exception {
|
public void whenTaskProcessingFails_thenReprocess() throws Exception {
|
||||||
TimeoutException error = new TimeoutException("Test timeout");
|
Exception error = new RuntimeException("Just a test");
|
||||||
doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any());
|
doThrow(error).when(tsHistoryDeletionTaskProcessor).process(any());
|
||||||
|
|
||||||
Device device = createDevice("test", "test");
|
Device device = createDevice("test", "test");
|
||||||
@ -344,6 +350,54 @@ public class HousekeeperServiceTest extends AbstractControllerTest {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenTaskProcessingTimedOut_thenInterruptAndReprocess() throws Exception {
|
||||||
|
ExecutorService someExecutor = Executors.newSingleThreadExecutor();
|
||||||
|
AtomicBoolean taskInterrupted = new AtomicBoolean(false);
|
||||||
|
AtomicBoolean underlyingTaskInterrupted = new AtomicBoolean(false);
|
||||||
|
doAnswer(invocationOnMock -> {
|
||||||
|
Future<?> future = someExecutor.submit(() -> {
|
||||||
|
try {
|
||||||
|
Thread.sleep(TimeUnit.HOURS.toMillis(24));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
underlyingTaskInterrupted.set(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
taskInterrupted.set(true);
|
||||||
|
future.cancel(true);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}).when(tsHistoryDeletionTaskProcessor).process(any());
|
||||||
|
|
||||||
|
Device device = createDevice("test", "test");
|
||||||
|
createRelatedData(device.getId());
|
||||||
|
|
||||||
|
doDelete("/api/device/" + device.getId()).andExpect(status().isOk());
|
||||||
|
|
||||||
|
int attempts = 2;
|
||||||
|
await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
for (int i = 0; i <= attempts; i++) {
|
||||||
|
int attempt = i;
|
||||||
|
verify(housekeeperReprocessingService).submitForReprocessing(argThat(getTaskMatcher(device.getId(), HousekeeperTaskType.DELETE_TS_HISTORY,
|
||||||
|
task -> task.getAttempt() == attempt)), argThat(error -> error instanceof TimeoutException));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertThat(taskInterrupted).isTrue();
|
||||||
|
assertThat(underlyingTaskInterrupted).isTrue();
|
||||||
|
|
||||||
|
assertThat(getTimeseriesHistory(device.getId())).isNotEmpty();
|
||||||
|
doCallRealMethod().when(tsHistoryDeletionTaskProcessor).process(any());
|
||||||
|
someExecutor.shutdown();
|
||||||
|
|
||||||
|
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
assertThat(getTimeseriesHistory(device.getId())).isEmpty();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception {
|
public void whenReprocessingAttemptsExceeded_thenDropTheTask() throws Exception {
|
||||||
TimeoutException error = new TimeoutException("Test timeout");
|
TimeoutException error = new TimeoutException("Test timeout");
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
/**
|
/**
|
||||||
* Copyright © 2016-2024 The Thingsboard Authors
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
* <p>
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
* You may obtain a copy of the License at
|
* You may obtain a copy of the License at
|
||||||
* <p>
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p>
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
|||||||
@ -130,7 +130,7 @@ export class JsInvokeMessageProcessor {
|
|||||||
|
|
||||||
processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
|
processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) {
|
||||||
const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
|
const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest);
|
||||||
this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Processing compile request, scriptId: [%s], compileRequest [%s]', requestId, scriptId, compileRequest);
|
||||||
if (this.scriptMap.has(scriptId)) {
|
if (this.scriptMap.has(scriptId)) {
|
||||||
const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
|
const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true);
|
||||||
this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Script was already compiled, scriptId: [%s]', requestId, scriptId);
|
||||||
@ -154,7 +154,7 @@ export class JsInvokeMessageProcessor {
|
|||||||
|
|
||||||
processInvokeRequest(requestId: string, responseTopic: string, headers: any, invokeRequest: JsInvokeRequest) {
|
processInvokeRequest(requestId: string, responseTopic: string, headers: any, invokeRequest: JsInvokeRequest) {
|
||||||
const scriptId = JsInvokeMessageProcessor.getScriptId(invokeRequest);
|
const scriptId = JsInvokeMessageProcessor.getScriptId(invokeRequest);
|
||||||
this.logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Processing invoke request, scriptId: [%s], invokeRequest [%s]', requestId, scriptId, invokeRequest);
|
||||||
this.executedScriptsCounter++;
|
this.executedScriptsCounter++;
|
||||||
if (this.executedScriptsCounter % statFrequency == 0) {
|
if (this.executedScriptsCounter % statFrequency == 0) {
|
||||||
const nowMs = performance.now();
|
const nowMs = performance.now();
|
||||||
@ -217,7 +217,7 @@ export class JsInvokeMessageProcessor {
|
|||||||
|
|
||||||
processReleaseRequest(requestId: string, responseTopic: string, headers: any, releaseRequest: JsReleaseRequest) {
|
processReleaseRequest(requestId: string, responseTopic: string, headers: any, releaseRequest: JsReleaseRequest) {
|
||||||
const scriptId = JsInvokeMessageProcessor.getScriptId(releaseRequest);
|
const scriptId = JsInvokeMessageProcessor.getScriptId(releaseRequest);
|
||||||
this.logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId);
|
this.logger.debug('[%s] Processing release request, scriptId: [%s], releaseRequest [%s]', requestId, scriptId, releaseRequest);
|
||||||
if (this.scriptMap.has(scriptId)) {
|
if (this.scriptMap.has(scriptId)) {
|
||||||
const index = this.scriptIds.indexOf(scriptId);
|
const index = this.scriptIds.indexOf(scriptId);
|
||||||
if (index > -1) {
|
if (index > -1) {
|
||||||
|
|||||||
@ -115,7 +115,7 @@ public class SemaphoreWithTbMsgQueue {
|
|||||||
}
|
}
|
||||||
TbMsg msg = tbMsgTbContext.msg();
|
TbMsg msg = tbMsgTbContext.msg();
|
||||||
TbContext ctx = tbMsgTbContext.ctx();
|
TbContext ctx = tbMsgTbContext.ctx();
|
||||||
log.warn("[{}] Failed to process message: {}", entityId, msg, t);
|
log.debug("[{}] Failed to process message: {}", entityId, msg, t);
|
||||||
ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed
|
ctx.tellFailure(msg, t); // you are not allowed to throw here, because queue will remain unprocessed
|
||||||
continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty
|
continue; // We are probably the last who process the queue. We have to continue poll until get successful callback or queue is empty
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user