From eaa0073003741144e7a78de9469645f2cf1415e0 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 29 May 2025 17:47:34 +0300 Subject: [PATCH 1/2] Cancel currently running tasks when the job is discarded --- .../KafkaCalculatedFieldStateService.java | 2 +- .../server/service/job/DummyJobProcessor.java | 1 + .../service/job/task/DummyTaskProcessor.java | 15 +++-- ...faultTbCalculatedFieldConsumerService.java | 2 +- .../queue/DefaultTbCoreConsumerService.java | 2 +- .../queue/DefaultTbEdgeConsumerService.java | 2 +- .../TbRuleEngineQueueConsumerManager.java | 1 + .../server/service/job/JobManagerTest.java | 26 +++++++++ .../ruleengine/TbRuleEngineStrategyTest.java | 2 +- .../data/job/DummyJobConfiguration.java | 1 + .../common/data/job/task/DummyTask.java | 1 + .../server/edqs/processor/EdqsProcessor.java | 2 +- .../edqs/state/KafkaEdqsStateService.java | 2 +- .../PartitionedQueueResponseTemplate.java | 2 +- .../consumer/MainQueueConsumerManager.java | 12 ++-- .../server/queue/task/TaskProcessor.java | 56 ++++++++++++++++--- 16 files changed, 103 insertions(+), 26 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 533b487d38..ded7adbc34 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -73,7 +73,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta .queueKey(queueKey) .topic(partitionService.getTopic(queueKey)) .pollInterval(pollInterval) - .msgPackProcessor((msgs, consumer, config) -> { + .msgPackProcessor((msgs, consumer, consumerKey, config) -> { for (TbProtoQueueMsg msg : msgs) { try { if (msg.getValue() != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java index 373bc4afee..d38cdfb6f5 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/DummyJobProcessor.java @@ -82,6 +82,7 @@ public class DummyJobProcessor implements JobProcessor { .processingTimeMs(configuration.getTaskProcessingTimeMs()) .errors(errors) .failAlways(failAlways) + .processingTimeoutMs(configuration.getTaskProcessingTimeoutMs()) .build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index 5c88083146..0177e70b10 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -15,13 +15,16 @@ */ package org.thingsboard.server.service.job.task; -import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.tuple.Pair; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.DummyTask; import org.thingsboard.server.common.data.job.task.DummyTaskResult; +import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.queue.task.TaskProcessor; -@RequiredArgsConstructor +import java.util.Map; +import java.util.concurrent.Future; + public class DummyTaskProcessor extends TaskProcessor { @Override @@ -40,8 +43,12 @@ public class DummyTaskProcessor extends TaskProcessor 0 ? task.getProcessingTimeoutMs() : 2000; + } + + public Map, Future>> getCurrentTasks() { + return currentTasks; } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index f8c398b016..8d4ab25578 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -141,7 +141,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa } } - private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig config) throws Exception { + private void processMsgs(List> msgs, TbQueueConsumer> consumer, Object consumerKey, QueueConfig config) throws Exception { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg)); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 264fa86456..f0b1a4d7d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -255,7 +255,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> msgs, TbQueueConsumer> consumer, QueueConfig config) throws Exception { + private void processMsgs(List> msgs, TbQueueConsumer> consumer, Object consumerKey, QueueConfig config) throws Exception { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg)); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java index a9ae18a11f..3dd6993962 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java @@ -126,7 +126,7 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService> msgs, TbQueueConsumer> consumer, QueueConfig edgeQueueConfig) throws InterruptedException { + private void processMsgs(List> msgs, TbQueueConsumer> consumer, Object consumerKey, QueueConfig edgeQueueConfig) throws InterruptedException { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg)); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 3aa0d2eabd..d067be49a0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -127,6 +127,7 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager> msgs, TbQueueConsumer> consumer, + Object consumerKey, Queue queue) throws Exception { TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue); TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue); diff --git a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java index 0278e910d9..b23722afd5 100644 --- a/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/job/JobManagerTest.java @@ -172,6 +172,32 @@ public class JobManagerTest extends AbstractControllerTest { }); } + @Test + public void testCancelJob_whileTaskRunning() throws Exception { + JobId jobId = submitJob(DummyJobConfiguration.builder() + .successfulTasksCount(1) + .taskProcessingTimeMs(TimeUnit.HOURS.toMillis(1)) + .taskProcessingTimeoutMs(TimeUnit.HOURS.toMillis(1)) + .build()).getId(); + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(taskProcessor.getCurrentTasks()).isNotEmpty(); + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.RUNNING); + assertThat(job.getResult().getTotalCount()).isEqualTo(1); + assertThat(job.getResult().getCompletedCount()).isZero(); + }); + + cancelJob(jobId); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + Job job = findJobById(jobId); + assertThat(job.getStatus()).isEqualTo(JobStatus.CANCELLED); + assertThat(job.getResult().getTotalCount()).isEqualTo(1); + assertThat(job.getResult().getDiscardedCount()).isEqualTo(1); + assertThat(job.getResult().getFailedCount()).isZero(); + }); + } + @Test public void testCancelJob_simulateTaskProcessorRestart() throws Exception { int tasksCount = 10; diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java index 8c019029a5..1106fad5b6 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineStrategyTest.java @@ -238,7 +238,7 @@ public class TbRuleEngineStrategyTest { .map(this::toProto) .toList(); - consumerManager.processMsgs(protoMsgs, consumer, queue); + consumerManager.processMsgs(protoMsgs, consumer, queueKey, queue); processingData.forEach(data -> { verify(actorContext, times(data.attempts)).tell(argThat(msg -> diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java index a9ff9e7f4f..decdc71177 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/DummyJobConfiguration.java @@ -38,6 +38,7 @@ public class DummyJobConfiguration extends JobConfiguration { private int permanentlyFailedTasksCount; private List errors; private int retries; + private long taskProcessingTimeoutMs; private String generalError; private int submittedTasksBeforeGeneralError; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java index e0f670ad9e..66f0fc9ff9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/job/task/DummyTask.java @@ -36,6 +36,7 @@ public class DummyTask extends Task { private int number; private long processingTimeMs; + private long processingTimeoutMs; private List errors; // errors for each attempt private boolean failAlways; diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 9fe6b2843e..510d2c3a41 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -120,7 +120,7 @@ public class EdqsProcessor implements TbQueueHandler, .queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic())) .topic(topicService.buildTopicName(config.getEventsTopic())) .pollInterval(config.getPollInterval()) - .msgPackProcessor((msgs, consumer, config) -> { + .msgPackProcessor((msgs, consumer, consumerKey, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { if (consumer.isStopped()) { return; diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 0c90235ecd..66bbb7a68a 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -91,7 +91,7 @@ public class KafkaEdqsStateService implements EdqsStateService { .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) .topic(topicService.buildTopicName(config.getStateTopic())) .pollInterval(config.getPollInterval()) - .msgPackProcessor((msgs, consumer, config) -> { + .msgPackProcessor((msgs, consumer, consumerKey, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { try { ToEdqsMsg msg = queueMsg.getValue(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java index d1e1be708f..7e913009f0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/PartitionedQueueResponseTemplate.java @@ -74,7 +74,7 @@ public class PartitionedQueueResponseTemplate processRequests(requests, consumer)) + .msgPackProcessor((requests, consumer, consumerKey, config) -> processRequests(requests, consumer)) .consumerCreator((config, tpi) -> consumerCreator.apply(tpi)) .consumerExecutor(consumerExecutor) .scheduler(scheduler) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index 86e0721dd8..db5bac7170 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -203,7 +203,7 @@ public class MainQueueConsumerManager consumerLoop = consumerExecutor.submit(() -> { ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString()); - consumerLoop(consumerTask.getConsumer()); + consumerLoop(consumerTask.getKey(), consumerTask.getConsumer()); log.info("[{}] Consumer stopped", consumerTask.getKey()); try { @@ -218,7 +218,7 @@ public class MainQueueConsumerManager consumer) { + private void consumerLoop(Object consumerKey, TbQueueConsumer consumer) { try { while (!stopped && !consumer.isStopped()) { try { @@ -226,7 +226,7 @@ public class MainQueueConsumerManager msgs, TbQueueConsumer consumer, C config) throws Exception { + protected void processMsgs(List msgs, TbQueueConsumer consumer, Object consumerKey, C config) throws Exception { log.trace("Processing {} messages", msgs.size()); - msgPackProcessor.process(msgs, consumer, config); + msgPackProcessor.process(msgs, consumer, consumerKey, config); log.trace("Processed {} messages", msgs.size()); } @@ -273,7 +273,7 @@ public class MainQueueConsumerManager { - void process(List msgs, TbQueueConsumer consumer, C config) throws Exception; + void process(List msgs, TbQueueConsumer consumer, Object consumerKey, C config) throws Exception; } public interface ConsumerWrapper { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index ef0e48e30a..62ca19a05f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.task; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -25,6 +26,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.SetCache; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.job.task.Task; import org.thingsboard.server.common.data.job.task.TaskResult; @@ -42,14 +44,18 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.settings.TasksQueueConfig; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; public abstract class TaskProcessor, R extends TaskResult> { @@ -68,6 +74,8 @@ public abstract class TaskProcessor, R extends TaskResult> { private MainQueueConsumerManager, QueueConfig> taskConsumer; private final ExecutorService taskExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(getJobType().name().toLowerCase() + "-task-processor")); + protected final Map, Future>> currentTasks = new ConcurrentHashMap<>(); + private final SetCache discarded = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); private final SetCache failed = new SetCache<>(TimeUnit.MINUTES.toMillis(60)); @@ -104,21 +112,24 @@ public abstract class TaskProcessor, R extends TaskResult> { if (event.getEvent() == ComponentLifecycleEvent.STOPPED) { log.info("Adding job {} ({}) to discarded", entityId, tasksKey); addToDiscarded(tasksKey); + cancelRunningTasks(tasksKey); } else if (event.getEvent() == ComponentLifecycleEvent.FAILED) { log.info("Adding job {} ({}) to failed", entityId, tasksKey); failed.add(tasksKey); + cancelRunningTasks(tasksKey); } } case TENANT -> { if (event.getEvent() == ComponentLifecycleEvent.DELETED) { - deletedTenants.add(entityId.getId()); log.info("Adding tenant {} to deleted", entityId); + deletedTenants.add(entityId.getId()); + cancelRunningTasks((TenantId) entityId); } } } } - private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig queueConfig) throws Exception { + private void processMsgs(List> msgs, TbQueueConsumer> consumer, Object consumerKey, QueueConfig queueConfig) throws Exception { for (TbProtoQueueMsg msg : msgs) { try { @SuppressWarnings("unchecked") @@ -135,7 +146,7 @@ public abstract class TaskProcessor, R extends TaskResult> { continue; } - processTask(task); + processTask(task, consumerKey); } catch (InterruptedException e) { throw e; } catch (Exception e) { @@ -145,30 +156,39 @@ public abstract class TaskProcessor, R extends TaskResult> { consumer.commit(); } - private void processTask(T task) throws InterruptedException { + private void processTask(T task, Object consumerKey) throws InterruptedException { task.setAttempt(task.getAttempt() + 1); log.debug("Processing task: {}", task); Future future = null; try { long startNs = System.nanoTime(); + long timeoutMs = getProcessingTimeout(task); + future = taskExecutor.submit(() -> process(task)); + currentTasks.put(consumerKey, Pair.of(task, future)); + R result; try { - result = future.get(getTaskProcessingTimeout(), TimeUnit.MILLISECONDS); + result = future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw e.getCause(); } catch (TimeoutException e) { - throw new TimeoutException("Timeout after " + getTaskProcessingTimeout() + " ms"); + throw new TimeoutException("Timeout after " + timeoutMs + " ms"); } + long timingNs = System.nanoTime() - startNs; log.info("Processed task in {} ms: {}", timingNs / 1000000.0, task); reportTaskResult(task, result); } catch (InterruptedException e) { throw e; + } catch (CancellationException e) { + if (!failed.contains(task.getKey()) && !deletedTenants.contains(task.getTenantId().getId())) { + reportTaskDiscarded(task); + } } catch (Throwable e) { log.error("Failed to process task (attempt {}): {}", task.getAttempt(), task, e); if (task.getAttempt() <= task.getRetries()) { - processTask(task); + processTask(task, consumerKey); } else { reportTaskFailure(task, e); } @@ -176,11 +196,31 @@ public abstract class TaskProcessor, R extends TaskResult> { if (future != null && !future.isDone()) { future.cancel(true); } + currentTasks.remove(consumerKey); } } public abstract R process(T task) throws Exception; + private void cancelRunningTasks(String tasksKey) { + cancelRunningTasks(task -> task.getKey().equals(tasksKey)); + } + + private void cancelRunningTasks(TenantId tenantId) { + cancelRunningTasks(task -> task.getTenantId().equals(tenantId)); + } + + private void cancelRunningTasks(Predicate> filter) { + currentTasks.values().forEach(entry -> { + Task task = entry.getKey(); + Future future = entry.getValue(); + if (filter.test(task)) { + log.debug("Cancelling running task {}", task); + future.cancel(true); + } + }); + } + private void reportTaskFailure(T task, Throwable error) { R taskResult = task.toFailed(error); reportTaskResult(task, taskResult); @@ -215,7 +255,7 @@ public abstract class TaskProcessor, R extends TaskResult> { taskExecutor.shutdownNow(); } - public abstract long getTaskProcessingTimeout(); + public abstract long getProcessingTimeout(T task); public abstract JobType getJobType(); From 830f022f7d3f1b5a4439c5bec6e97781db030a06 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 29 May 2025 17:54:23 +0300 Subject: [PATCH 2/2] Add permission checks for jobs api --- .../server/controller/BaseController.java | 12 ++++++++++-- .../server/controller/JobController.java | 18 ++++++++++++------ .../service/security/permission/Resource.java | 3 ++- .../permission/TenantAdminPermissions.java | 1 + 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index e5ea69dcb5..2443037ca0 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -23,8 +23,6 @@ import jakarta.servlet.ServletOutputStream; import jakarta.servlet.http.HttpServletResponse; import jakarta.validation.ConstraintViolation; import lombok.Getter; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.hibernate.exception.ConstraintViolationException; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -92,6 +90,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.HasId; +import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.id.MobileAppBundleId; import org.thingsboard.server.common.data.id.MobileAppId; import org.thingsboard.server.common.data.id.NotificationTargetId; @@ -108,6 +107,7 @@ import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.WidgetTypeId; import org.thingsboard.server.common.data.id.WidgetsBundleId; +import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.mobile.app.MobileApp; import org.thingsboard.server.common.data.mobile.bundle.MobileAppBundle; import org.thingsboard.server.common.data.notification.targets.NotificationTarget; @@ -146,6 +146,7 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; +import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.mobile.MobileAppBundleService; import org.thingsboard.server.dao.mobile.MobileAppService; import org.thingsboard.server.dao.model.ModelConstants; @@ -370,6 +371,9 @@ public abstract class BaseController { @Autowired protected NotificationTargetService notificationTargetService; + @Autowired + protected JobService jobService; + @Autowired protected CalculatedFieldService calculatedFieldService; @@ -825,6 +829,10 @@ public abstract class BaseController { return checkEntityId(notificationTargetId, notificationTargetService::findNotificationTargetById, operation); } + Job checkJobId(JobId jobId, Operation operation) throws ThingsboardException { + return checkEntityId(jobId, jobService::findJobById, operation); + } + protected I emptyId(EntityType entityType) { return (I) EntityIdFactory.getByTypeAndUuid(entityType, ModelConstants.NULL_UUID); } diff --git a/application/src/main/java/org/thingsboard/server/controller/JobController.java b/application/src/main/java/org/thingsboard/server/controller/JobController.java index 3f3ca5e64f..fbd75dfc63 100644 --- a/application/src/main/java/org/thingsboard/server/controller/JobController.java +++ b/application/src/main/java/org/thingsboard/server/controller/JobController.java @@ -35,8 +35,8 @@ import org.thingsboard.server.common.data.job.JobStatus; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.security.permission.Operation; import java.util.List; import java.util.UUID; @@ -53,13 +53,13 @@ import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERT @Slf4j public class JobController extends BaseController { - private final JobService jobService; private final JobManager jobManager; @GetMapping("/job/{id}") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public Job getJobById(@PathVariable UUID id) throws ThingsboardException { - return jobService.findJobById(getTenantId(), new JobId(id)); + JobId jobId = new JobId(id); + return checkJobId(jobId, Operation.READ); } @GetMapping("/jobs") @@ -93,19 +93,25 @@ public class JobController extends BaseController { @PostMapping("/job/{id}/cancel") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public void cancelJob(@PathVariable UUID id) throws ThingsboardException { - jobManager.cancelJob(getTenantId(), new JobId(id)); + JobId jobId = new JobId(id); + checkJobId(jobId, Operation.WRITE); + jobManager.cancelJob(getTenantId(), jobId); } @PostMapping("/job/{id}/reprocess") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public void reprocessJob(@PathVariable UUID id) throws ThingsboardException { - jobManager.reprocessJob(getTenantId(), new JobId(id)); + JobId jobId = new JobId(id); + checkJobId(jobId, Operation.WRITE); + jobManager.reprocessJob(getTenantId(), jobId); } @DeleteMapping("/job/{id}") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") public void deleteJob(@PathVariable UUID id) throws ThingsboardException { - jobService.deleteJob(getTenantId(), new JobId(id)); + JobId jobId = new JobId(id); + checkJobId(jobId, Operation.DELETE); + jobService.deleteJob(getTenantId(), jobId); } } diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java b/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java index 4cb281a719..2a92c040e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/Resource.java @@ -50,7 +50,8 @@ public enum Resource { VERSION_CONTROL, NOTIFICATION(EntityType.NOTIFICATION_TARGET, EntityType.NOTIFICATION_TEMPLATE, EntityType.NOTIFICATION_REQUEST, EntityType.NOTIFICATION_RULE), - MOBILE_APP_SETTINGS; + MOBILE_APP_SETTINGS, + JOB(EntityType.JOB); private final Set entityTypes; diff --git a/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java b/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java index 7a67d6739e..58023be34d 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java +++ b/application/src/main/java/org/thingsboard/server/service/security/permission/TenantAdminPermissions.java @@ -55,6 +55,7 @@ public class TenantAdminPermissions extends AbstractPermissions { put(Resource.OAUTH2_CONFIGURATION_TEMPLATE, new PermissionChecker.GenericPermissionChecker(Operation.READ)); put(Resource.MOBILE_APP, tenantEntityPermissionChecker); put(Resource.MOBILE_APP_BUNDLE, tenantEntityPermissionChecker); + put(Resource.JOB, tenantEntityPermissionChecker); } public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {