Refactoring for tasks api

This commit is contained in:
ViacheslavKlimov 2025-05-13 11:54:36 +03:00
parent faf4a2165f
commit 7d0b6bfdec
14 changed files with 67 additions and 22 deletions

View File

@ -1908,6 +1908,7 @@ queue:
# In a single-tenant environment, use 'entity' strategy to distribute the tasks among multiple partitions.
partitioning_strategy: "${TB_QUEUE_TASKS_PARTITIONING_STRATEGY:tenant}"
stats:
topic: "${TB_QUEUE_TASKS_STATS_TOPIC:jobs.stats}"
# Interval in milliseconds to process job stats
processing_interval_ms: "${TB_QUEUE_TASKS_STATS_PROCESSING_INTERVAL_MS:1000}"

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.queue;
public interface TbQueueCallback {
TbQueueCallback EMPTY = new TbQueueCallback() {
@ -34,4 +33,5 @@ public interface TbQueueCallback {
void onSuccess(TbQueueMsgMetadata metadata);
void onFailure(Throwable t);
}

View File

@ -43,6 +43,7 @@ import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
@ -68,6 +69,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final EdqsConfig edqsConfig;
private final TasksQueueConfig tasksQueueConfig;
private final InMemoryStorage storage;
@Override
@ -267,7 +269,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return new InMemoryTbQueueConsumer<>(storage, "jobs.stats");
return new InMemoryTbQueueConsumer<>(storage, tasksQueueConfig.getStatsTopic());
}
@Scheduled(fixedRateString = "${queue.in_memory.stats.print-interval-ms:60000}")

View File

@ -66,6 +66,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
@ -95,6 +96,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final EdqsConfig edqsConfig;
private final TasksQueueConfig tasksQueueConfig;
private final TbQueueAdmin coreAdmin;
private final TbKafkaAdmin ruleEngineAdmin;
@ -130,7 +132,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbQueueCalculatedFieldSettings calculatedFieldSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbKafkaTopicConfigs kafkaTopicConfigs,
EdqsConfig edqsConfig) {
EdqsConfig edqsConfig,
TasksQueueConfig tasksQueueConfig) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
this.serviceInfoProvider = serviceInfoProvider;
@ -144,6 +147,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.edqsConfig = edqsConfig;
this.tasksQueueConfig = tasksQueueConfig;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -660,7 +664,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName("jobs.stats"))
.topic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic()))
.clientId("job-stats-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName("job-stats-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), JobStatsMsg.parseFrom(msg.getData()), msg.getHeaders()))

View File

@ -62,6 +62,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
@ -91,6 +92,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueEdgeSettings edgeSettings;
private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
private final EdqsConfig edqsConfig;
private final TasksQueueConfig tasksQueueConfig;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -126,6 +128,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueCalculatedFieldSettings calculatedFieldSettings,
EdqsConfig edqsConfig,
TasksQueueConfig tasksQueueConfig,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
@ -140,6 +143,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.edgeSettings = edgeSettings;
this.calculatedFieldSettings = calculatedFieldSettings;
this.edqsConfig = edqsConfig;
this.tasksQueueConfig = tasksQueueConfig;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -539,7 +543,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName("jobs.stats"))
.topic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic()))
.clientId("job-stats-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName("job-stats-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), JobStatsMsg.parseFrom(msg.getData()), msg.getHeaders()))

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Data
@Component
public class TasksQueueConfig {
@Value("${queue.tasks.poll_interval}")
private int pollInterval;
@Value("${queue.tasks.stats.topic}")
private String statsTopic;
}

View File

@ -27,6 +27,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='in-memory'")
@ -34,6 +35,7 @@ import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFactory {
private final InMemoryStorage storage;
private final TasksQueueConfig tasksQueueConfig;
@Override
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) {
@ -42,7 +44,7 @@ public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFact
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return new InMemoryTbQueueProducer<>(storage, "jobs.stats");
return new InMemoryTbQueueProducer<>(storage, tasksQueueConfig.getStatsTopic());
}
}

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
@ -39,6 +40,7 @@ public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory
private final TopicService topicService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TasksQueueConfig tasksQueueConfig;
private final TbKafkaSettings kafkaSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
@ -46,12 +48,14 @@ public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory
public KafkaTaskProcessorQueueFactory(TopicService topicService,
TbServiceInfoProvider serviceInfoProvider,
TasksQueueConfig tasksQueueConfig,
TbKafkaSettings kafkaSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.serviceInfoProvider = serviceInfoProvider;
this.kafkaSettings = kafkaSettings;
this.topicService = topicService;
this.serviceInfoProvider = serviceInfoProvider;
this.tasksQueueConfig = tasksQueueConfig;
this.kafkaSettings = kafkaSettings;
this.consumerStatsService = consumerStatsService;
this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs());
}
@ -73,7 +77,7 @@ public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.clientId("job-stats-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName("jobs.stats"))
.defaultTopic(topicService.buildTopicName(tasksQueueConfig.getStatsTopic()))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();

View File

@ -20,7 +20,6 @@ import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.SetCache;
@ -40,6 +39,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import java.util.List;
import java.util.Set;
@ -61,9 +61,8 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
private JobStatsService statsService;
@Autowired
private TaskProcessorExecutors executors;
@Value("${queue.tasks.poll_interval:500}")
private int pollInterval;
@Autowired
private TasksQueueConfig config;
private QueueKey queueKey;
private MainQueueConsumerManager<TbProtoQueueMsg<TaskProto>, QueueConfig> taskConsumer;
@ -77,7 +76,7 @@ public abstract class TaskProcessor<T extends Task<R>, R extends TaskResult> {
queueKey = new QueueKey(ServiceType.TASK_PROCESSOR, getJobType().name());
taskConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<TaskProto>, QueueConfig>builder()
.queueKey(queueKey)
.config(QueueConfig.of(true, pollInterval))
.config(QueueConfig.of(true, config.getPollInterval()))
.msgPackProcessor(this::processMsgs)
.consumerCreator((queueConfig, tpi) -> queueFactory.createTaskConsumer(getJobType()))
.consumerExecutor(executors.getConsumersExecutor())

View File

@ -181,7 +181,7 @@ public class DefaultJobService extends AbstractEntityService implements JobServi
@Override
public Job findLatestJobByKey(TenantId tenantId, String key) {
return jobDao.findLatestByKey(tenantId, key);
return jobDao.findLatestByTenantIdAndKey(tenantId, key);
}
private Job findForUpdate(TenantId tenantId, JobId jobId) {

View File

@ -31,7 +31,7 @@ public interface JobDao extends Dao<Job> {
Job findByIdForUpdate(TenantId tenantId, JobId jobId);
Job findLatestByKey(TenantId tenantId, String key);
Job findLatestByTenantIdAndKey(TenantId tenantId, String key);
boolean existsByTenantAndKeyAndStatusOneOf(TenantId tenantId, String key, JobStatus... statuses);

View File

@ -25,8 +25,6 @@ import jakarta.persistence.Table;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.JdbcType;
import org.hibernate.dialect.PostgreSQLJsonPGObjectJsonbType;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobConfiguration;
@ -68,7 +66,6 @@ public class JobEntity extends BaseSqlEntity<Job> {
private JsonNode configuration;
@Convert(converter = JsonConverter.class)
@JdbcType(PostgreSQLJsonPGObjectJsonbType.class)
@Column(name = ModelConstants.JOB_RESULT_PROPERTY)
private JsonNode result;

View File

@ -59,7 +59,7 @@ public class JpaJobDao extends JpaAbstractDao<JobEntity, Job> implements JobDao
}
@Override
public Job findLatestByKey(TenantId tenantId, String key) {
public Job findLatestByTenantIdAndKey(TenantId tenantId, String key) {
return DaoUtil.getData(jobRepository.findLatestByTenantIdAndKey(tenantId.getId(), key));
}

View File

@ -957,6 +957,6 @@ CREATE TABLE IF NOT EXISTS job (
key varchar NOT NULL,
description varchar NOT NULL,
status varchar NOT NULL,
configuration varchar(1000000) NOT NULL,
result jsonb
configuration varchar NOT NULL,
result varchar
);