From 7d0b6bfdec480d00784890d0b517b4350758c51a Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 13 May 2025 11:54:36 +0300 Subject: [PATCH] Refactoring for tasks api --- .../src/main/resources/thingsboard.yml | 1 + .../server/queue/TbQueueCallback.java | 2 +- .../InMemoryMonolithQueueFactory.java | 4 ++- .../provider/KafkaMonolithQueueFactory.java | 8 +++-- .../provider/KafkaTbCoreQueueFactory.java | 6 +++- .../queue/settings/TasksQueueConfig.java | 32 +++++++++++++++++++ .../InMemoryTaskProcessorQueueFactory.java | 4 ++- .../task/KafkaTaskProcessorQueueFactory.java | 10 ++++-- .../server/queue/task/TaskProcessor.java | 9 +++--- .../server/dao/job/DefaultJobService.java | 2 +- .../thingsboard/server/dao/job/JobDao.java | 2 +- .../server/dao/model/sql/JobEntity.java | 3 -- .../server/dao/sql/job/JpaJobDao.java | 2 +- .../main/resources/sql/schema-entities.sql | 4 +-- 14 files changed, 67 insertions(+), 22 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9d14afb541..4211b4e845 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1908,6 +1908,7 @@ queue: # In a single-tenant environment, use 'entity' strategy to distribute the tasks among multiple partitions. partitioning_strategy: "${TB_QUEUE_TASKS_PARTITIONING_STRATEGY:tenant}" stats: + topic: "${TB_QUEUE_TASKS_STATS_TOPIC:jobs.stats}" # Interval in milliseconds to process job stats processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}" diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java index e15d9c8ace..99523bf21f 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueCallback.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.queue; - public interface TbQueueCallback { TbQueueCallback EMPTY = new TbQueueCallback() { @@ -34,4 +33,5 @@ public interface TbQueueCallback { void onSuccess(TbQueueMsgMetadata metadata); void onFailure(Throwable t); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index a164237366..ddaf34715f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -43,6 +43,7 @@ import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; +import org.thingsboard.server.queue.settings.TasksQueueConfig; import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueEdgeSettings; @@ -68,6 +69,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE private final TbQueueEdgeSettings edgeSettings; private final TbQueueCalculatedFieldSettings calculatedFieldSettings; private final EdqsConfig edqsConfig; + private final TasksQueueConfig tasksQueueConfig; private final InMemoryStorage storage; @Override @@ -267,7 +269,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @Override public TbQueueConsumer> createJobStatsConsumer() { - return new InMemoryTbQueueConsumer<>(storage, "jobs.stats"); + return new InMemoryTbQueueConsumer<>(storage, tasksQueueConfig.getStatsTopic()); } @Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index b15e60f09d..902848f65f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -66,6 +66,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.settings.TasksQueueConfig; import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueEdgeSettings; @@ -95,6 +96,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueCalculatedFieldSettings calculatedFieldSettings; private final TbKafkaConsumerStatsService consumerStatsService; private final EdqsConfig edqsConfig; + private final TasksQueueConfig tasksQueueConfig; private final TbQueueAdmin coreAdmin; private final TbKafkaAdmin ruleEngineAdmin; @@ -130,7 +132,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TbQueueCalculatedFieldSettings calculatedFieldSettings, TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs, - EdqsConfig edqsConfig) { + EdqsConfig edqsConfig, + TasksQueueConfig tasksQueueConfig) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; this.serviceInfoProvider = serviceInfoProvider; @@ -144,6 +147,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.edgeSettings = edgeSettings; this.calculatedFieldSettings = calculatedFieldSettings; this.edqsConfig = edqsConfig; + this.tasksQueueConfig = tasksQueueConfig; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -660,7 +664,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi public TbQueueConsumer> createJobStatsConsumer() { return TbKafkaConsumerTemplate.>builder() .settings(kafkaSettings) - .topic(topicService.buildTopicName("jobs.stats")) + .topic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic())) .clientId("job-stats-consumer-" + serviceInfoProvider.getServiceId()) .groupId(topicService.buildTopicName("job-stats-consumer-group")) .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), JobStatsMsg.parseFrom(msg.getData()), msg.getHeaders())) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 0bf6acd830..048b08f15a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -62,6 +62,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.settings.TasksQueueConfig; import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.queue.settings.TbQueueCoreSettings; import org.thingsboard.server.queue.settings.TbQueueEdgeSettings; @@ -91,6 +92,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueEdgeSettings edgeSettings; private final TbQueueCalculatedFieldSettings calculatedFieldSettings; private final EdqsConfig edqsConfig; + private final TasksQueueConfig tasksQueueConfig; private final TbQueueAdmin coreAdmin; private final TbQueueAdmin ruleEngineAdmin; @@ -126,6 +128,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { TbQueueTransportNotificationSettings transportNotificationSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings, EdqsConfig edqsConfig, + TasksQueueConfig tasksQueueConfig, TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; @@ -140,6 +143,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { this.edgeSettings = edgeSettings; this.calculatedFieldSettings = calculatedFieldSettings; this.edqsConfig = edqsConfig; + this.tasksQueueConfig = tasksQueueConfig; this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs()); this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs()); @@ -539,7 +543,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { public TbQueueConsumer> createJobStatsConsumer() { return TbKafkaConsumerTemplate.>builder() .settings(kafkaSettings) - .topic(topicService.buildTopicName("jobs.stats")) + .topic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic())) .clientId("job-stats-consumer-" + serviceInfoProvider.getServiceId()) .groupId(topicService.buildTopicName("job-stats-consumer-group")) .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), JobStatsMsg.parseFrom(msg.getData()), msg.getHeaders())) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java new file mode 100644 index 0000000000..f4916a411e --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TasksQueueConfig.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.settings; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Data +@Component +public class TasksQueueConfig { + + @Value("${queue.tasks.poll_interval}") + private int pollInterval; + + @Value("${queue.tasks.stats.topic}") + private String statsTopic; + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java index dbe302dfee..76effa6304 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java @@ -27,6 +27,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.memory.InMemoryStorage; import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; +import org.thingsboard.server.queue.settings.TasksQueueConfig; @Component @ConditionalOnExpression("'${queue.type:null}'=='in-memory'") @@ -34,6 +35,7 @@ import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFactory { private final InMemoryStorage storage; + private final TasksQueueConfig tasksQueueConfig; @Override public TbQueueConsumer> createTaskConsumer(JobType jobType) { @@ -42,7 +44,7 @@ public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFact @Override public TbQueueProducer> createJobStatsProducer() { - return new InMemoryTbQueueProducer<>(storage, "jobs.stats"); + return new InMemoryTbQueueProducer<>(storage, tasksQueueConfig.getStatsTopic()); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java index 77a47a20c8..6b95b0530c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java @@ -32,6 +32,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; import org.thingsboard.server.queue.kafka.TbKafkaSettings; import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.settings.TasksQueueConfig; @Component @ConditionalOnExpression("'${queue.type:null}'=='kafka'") @@ -39,6 +40,7 @@ public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory private final TopicService topicService; private final TbServiceInfoProvider serviceInfoProvider; + private final TasksQueueConfig tasksQueueConfig; private final TbKafkaSettings kafkaSettings; private final TbKafkaConsumerStatsService consumerStatsService; @@ -46,12 +48,14 @@ public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory public KafkaTaskProcessorQueueFactory(TopicService topicService, TbServiceInfoProvider serviceInfoProvider, + TasksQueueConfig tasksQueueConfig, TbKafkaSettings kafkaSettings, TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs) { - this.serviceInfoProvider = serviceInfoProvider; - this.kafkaSettings = kafkaSettings; this.topicService = topicService; + this.serviceInfoProvider = serviceInfoProvider; + this.tasksQueueConfig = tasksQueueConfig; + this.kafkaSettings = kafkaSettings; this.consumerStatsService = consumerStatsService; this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs()); } @@ -73,7 +77,7 @@ public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory public TbQueueProducer> createJobStatsProducer() { return TbKafkaProducerTemplate.>builder() .clientId("job-stats-producer-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName("jobs.stats")) + .defaultTopic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic())) .settings(kafkaSettings) .admin(tasksAdmin) .build(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index ec113c03a1..947d252442 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -20,7 +20,6 @@ import jakarta.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.event.EventListener; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.SetCache; @@ -40,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.queue.settings.TasksQueueConfig; import java.util.List; import java.util.Set; @@ -61,9 +61,8 @@ public abstract class TaskProcessor, R extends TaskResult> { private JobStatsService statsService; @Autowired private TaskProcessorExecutors executors; - - @Value("${queue.tasks.poll_interval:500}") - private int pollInterval; + @Autowired + private TasksQueueConfig config; private QueueKey queueKey; private MainQueueConsumerManager, QueueConfig> taskConsumer; @@ -77,7 +76,7 @@ public abstract class TaskProcessor, R extends TaskResult> { queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, getJobType().name()); taskConsumer = MainQueueConsumerManager., QueueConfig>builder() .queueKey(queueKey) - .config(QueueConfig.of(true, pollInterval)) + .config(QueueConfig.of(true, config.getPollInterval())) .msgPackProcessor(this::processMsgs) .consumerCreator((queueConfig, tpi) -> queueFactory.createTaskConsumer(getJobType())) .consumerExecutor(executors.getConsumersExecutor()) diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java index 680bc81566..ba01b10326 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/DefaultJobService.java @@ -181,7 +181,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi @Override public Job findLatestJobByKey(TenantId tenantId, String key) { - return jobDao.findLatestByKey(tenantId, key); + return jobDao.findLatestByTenantIdAndKey(tenantId, key); } private Job findForUpdate(TenantId tenantId, JobId jobId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java b/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java index 0c70fd102d..46cc70aea8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/job/JobDao.java @@ -31,7 +31,7 @@ public interface JobDao extends Dao { Job findByIdForUpdate(TenantId tenantId, JobId jobId); - Job findLatestByKey(TenantId tenantId, String key); + Job findLatestByTenantIdAndKey(TenantId tenantId, String key); boolean existsByTenantAndKeyAndStatusOneOf(TenantId tenantId, String key, JobStatus... statuses); diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java index d13c4bbed3..498541962b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/JobEntity.java @@ -25,8 +25,6 @@ import jakarta.persistence.Table; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; -import org.hibernate.annotations.JdbcType; -import org.hibernate.dialect.PostgreSQLJsonPGObjectJsonbType; import org.thingsboard.server.common.data.id.JobId; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobConfiguration; @@ -68,7 +66,6 @@ public class JobEntity extends BaseSqlEntity { private JsonNode configuration; @Convert(converter = JsonConverter.class) - @JdbcType(PostgreSQLJsonPGObjectJsonbType.class) @Column(name = ModelConstants.JOB_RESULT_PROPERTY) private JsonNode result; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java index f59898eb73..5c9fe230e8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/job/JpaJobDao.java @@ -59,7 +59,7 @@ public class JpaJobDao extends JpaAbstractDao implements JobDao } @Override - public Job findLatestByKey(TenantId tenantId, String key) { + public Job findLatestByTenantIdAndKey(TenantId tenantId, String key) { return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key)); } diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index ba43a54697..62ce86a76c 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -957,6 +957,6 @@ CREATE TABLE IF NOT EXISTS job ( key varchar NOT NULL, description varchar NOT NULL, status varchar NOT NULL, - configuration varchar(1000000) NOT NULL, - result jsonb + configuration varchar NOT NULL, + result varchar );