Support for job manager on Rule Engine

This commit is contained in:
ViacheslavKlimov 2025-05-19 15:30:34 +03:00
parent 325c71f2ab
commit 5e46608abc
16 changed files with 269 additions and 106 deletions

View File

@ -31,6 +31,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.MqttClientSettings;
import org.thingsboard.rule.engine.api.NotificationCenter;
@ -553,11 +554,14 @@ public class ActorSystemContext {
@Getter
private CalculatedFieldQueueService calculatedFieldQueueService;
@Lazy
@Autowired(required = false)
@Autowired
@Getter
private JobService jobService;
@Autowired
@Getter
private JobManager jobManager;
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
@Getter
private int maxConcurrentSessionsPerDevice;

View File

@ -24,6 +24,7 @@ import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.MqttClientSettings;
import org.thingsboard.rule.engine.api.NotificationCenter;
@ -93,6 +94,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.mobile.MobileAppBundleService;
import org.thingsboard.server.dao.mobile.MobileAppService;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
@ -108,7 +110,6 @@ import org.thingsboard.server.dao.queue.QueueStatsService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
@ -895,6 +896,11 @@ public class DefaultTbContext implements TbContext {
return mainCtx.getJobService();
}
@Override
public JobManager getJobManager() {
return mainCtx.getJobManager();
}
@Override
public boolean isExternalNodeForceAck() {
return mainCtx.isExternalNodeForceAck();

View File

@ -36,7 +36,7 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.job.JobManager;
import org.thingsboard.rule.engine.api.JobManager;
import java.util.List;
import java.util.UUID;

View File

@ -59,9 +59,8 @@ import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.service.job.JobManager;
import org.thingsboard.rule.engine.api.JobManager;
import java.util.Optional;
import java.util.Set;
@Slf4j
@ -72,7 +71,7 @@ public class EntityStateSourcingListener {
private final TenantService tenantService;
private final TbClusterService tbClusterService;
private final EdgeSynchronizationManager edgeSynchronizationManager;
private final Optional<JobManager> jobManager;
private final JobManager jobManager;
@PostConstruct
public void init() {
@ -303,7 +302,7 @@ public class EntityStateSourcingListener {
}
private void onJobUpdate(Job job) {
jobManager.ifPresent(jobManager -> jobManager.onJobUpdate(job));
jobManager.onJobUpdate(job);
ComponentLifecycleEvent event;
if (job.getResult().getCancellationTs() > 0) {

View File

@ -16,20 +16,18 @@
package org.thingsboard.server.service.job;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobResult;
import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.JobStatus;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.job.task.Task;
@ -41,28 +39,22 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.notification.DefaultNotifications;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.task.JobStatsService;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.task.TaskProducerQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -78,12 +70,10 @@ public class DefaultJobManager implements JobManager {
private final TasksQueueConfig queueConfig;
private final Map<JobType, JobProcessor> jobProcessors;
private final Map<JobType, TbQueueProducer<TbProtoQueueMsg<TaskProto>>> taskProducers;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
private final ExecutorService executor;
private final ExecutorService consumerExecutor;
public DefaultJobManager(JobService jobService, JobStatsService jobStatsService, NotificationCenter notificationCenter,
PartitionService partitionService, TbCoreQueueFactory queueFactory, TasksQueueConfig queueConfig,
PartitionService partitionService, TaskProducerQueueFactory queueFactory, TasksQueueConfig queueConfig,
List<JobProcessor> jobProcessors) {
this.jobService = jobService;
this.jobStatsService = jobStatsService;
@ -93,20 +83,6 @@ public class DefaultJobManager implements JobManager {
this.jobProcessors = jobProcessors.stream().collect(Collectors.toMap(JobProcessor::getType, Function.identity()));
this.taskProducers = Arrays.stream(JobType.values()).collect(Collectors.toMap(Function.identity(), queueFactory::createTaskProducer));
this.executor = ThingsBoardExecutors.newWorkStealingPool(Math.max(4, Runtime.getRuntime().availableProcessors()), getClass());
this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer"));
this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder()
.name("job-stats")
.msgPackProcessor(this::processStats)
.pollInterval(queueConfig.getStatsPollInterval())
.consumerCreator(queueFactory::createJobStatsConsumer)
.consumerExecutor(consumerExecutor)
.build();
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void afterStartUp() {
jobStatsConsumer.subscribe();
jobStatsConsumer.launch();
}
@Override
@ -229,39 +205,6 @@ public class DefaultJobManager implements JobManager {
});
}
@SneakyThrows
private void processStats(List<TbProtoQueueMsg<JobStatsMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> consumer) {
Map<JobId, JobStats> stats = new HashMap<>();
for (TbProtoQueueMsg<JobStatsMsg> msg : msgs) {
JobStatsMsg statsMsg = msg.getValue();
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
JobId jobId = new JobId(new UUID(statsMsg.getJobIdMSB(), statsMsg.getJobIdLSB()));
JobStats jobStats = stats.computeIfAbsent(jobId, __ -> new JobStats(tenantId, jobId));
if (statsMsg.hasTaskResult()) {
TaskResult taskResult = JacksonUtil.fromString(statsMsg.getTaskResult().getValue(), TaskResult.class);
jobStats.getTaskResults().add(taskResult);
}
if (statsMsg.hasTotalTasksCount()) {
jobStats.setTotalTasksCount(statsMsg.getTotalTasksCount());
}
}
stats.forEach((jobId, jobStats) -> {
TenantId tenantId = jobStats.getTenantId();
try {
log.debug("[{}][{}] Processing job stats: {}", tenantId, jobId, stats);
jobService.processStats(tenantId, jobId, jobStats);
} catch (Exception e) {
log.error("[{}][{}] Failed to process job stats: {}", tenantId, jobId, jobStats, e);
}
});
consumer.commit();
Thread.sleep(queueConfig.getStatsProcessingInterval());
}
private void sendJobFinishedNotification(Job job) {
NotificationTemplate template = DefaultNotifications.DefaultNotification.builder()
.name("Job finished")
@ -284,9 +227,7 @@ public class DefaultJobManager implements JobManager {
@PreDestroy
private void destroy() {
jobStatsConsumer.stop();
executor.shutdownNow();
consumerExecutor.shutdownNow();
}
}

View File

@ -0,0 +1,115 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.job;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobStats;
import org.thingsboard.server.common.data.job.task.TaskResult;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@TbCoreComponent
@Component
@Slf4j
public class JobStatsProcessor {
private final JobService jobService;
private final TasksQueueConfig queueConfig;
private final QueueConsumerManager<TbProtoQueueMsg<JobStatsMsg>> jobStatsConsumer;
private final ExecutorService consumerExecutor;
public JobStatsProcessor(JobService jobService,
TasksQueueConfig queueConfig,
TbCoreQueueFactory queueFactory) {
this.jobService = jobService;
this.queueConfig = queueConfig;
this.consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("job-stats-consumer"));
this.jobStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<JobStatsMsg>>builder()
.name("job-stats")
.msgPackProcessor(this::processStats)
.pollInterval(queueConfig.getStatsPollInterval())
.consumerCreator(queueFactory::createJobStatsConsumer)
.consumerExecutor(consumerExecutor)
.build();
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void afterStartUp() {
jobStatsConsumer.subscribe();
jobStatsConsumer.launch();
}
@SneakyThrows
private void processStats(List<TbProtoQueueMsg<JobStatsMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> consumer) {
Map<JobId, JobStats> stats = new HashMap<>();
for (TbProtoQueueMsg<JobStatsMsg> msg : msgs) {
JobStatsMsg statsMsg = msg.getValue();
TenantId tenantId = TenantId.fromUUID(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
JobId jobId = new JobId(new UUID(statsMsg.getJobIdMSB(), statsMsg.getJobIdLSB()));
JobStats jobStats = stats.computeIfAbsent(jobId, __ -> new JobStats(tenantId, jobId));
if (statsMsg.hasTaskResult()) {
TaskResult taskResult = JacksonUtil.fromString(statsMsg.getTaskResult().getValue(), TaskResult.class);
jobStats.getTaskResults().add(taskResult);
}
if (statsMsg.hasTotalTasksCount()) {
jobStats.setTotalTasksCount(statsMsg.getTotalTasksCount());
}
}
stats.forEach((jobId, jobStats) -> {
TenantId tenantId = jobStats.getTenantId();
try {
log.debug("[{}][{}] Processing job stats: {}", tenantId, jobId, stats);
jobService.processStats(tenantId, jobId, jobStats);
} catch (Exception e) {
log.error("[{}][{}] Failed to process job stats: {}", tenantId, jobId, jobStats, e);
}
});
consumer.commit();
Thread.sleep(queueConfig.getStatsProcessingInterval());
}
@PreDestroy
private void destroy() {
jobStatsConsumer.stop();
consumerExecutor.shutdownNow();
}
}

View File

@ -23,6 +23,7 @@ import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.DummyJobConfiguration;

View File

@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -262,11 +261,6 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TaskProto>> createTaskProducer(JobType jobType) {
return new InMemoryTbQueueProducer<>(storage, jobType.getTasksTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return new InMemoryTbQueueConsumer<>(storage, tasksQueueConfig.getStatsTopic());

View File

@ -23,7 +23,6 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -31,7 +30,6 @@ import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -650,16 +648,6 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.clientId(jobType.name().toLowerCase() + "-task-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(jobType.getTasksTopic()))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()

View File

@ -22,12 +22,10 @@ import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -529,16 +527,6 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.clientId(jobType.name().toLowerCase() + "-task-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(jobType.getTasksTopic()))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()

View File

@ -17,10 +17,8 @@ package org.thingsboard.server.queue.provider;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -168,8 +166,6 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer();
TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType);
TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer();
}

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.task;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
@Component
@ConditionalOnExpression("'${queue.type:null}' == 'in-memory'")
@RequiredArgsConstructor
public class InMemoryTaskProducerQueueFactory implements TaskProducerQueueFactory {
private final InMemoryStorage storage;
@Override
public TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType) {
return new InMemoryTbQueueProducer<>(storage, jobType.getTasksTopic());
}
}

View File

@ -0,0 +1,62 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.task;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
@Component
@ConditionalOnExpression("'${queue.type:null}' == 'kafka' && ('${service.type:null}' == 'monolith' || " +
"'${service.type:null}' == 'tb-core' || '${service.type:null}' == 'tb-rule-engine')")
public class KafkaTaskProducerQueueFactory implements TaskProducerQueueFactory {
private final TopicService topicService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbKafkaSettings kafkaSettings;
private final TbQueueAdmin tasksAdmin;
KafkaTaskProducerQueueFactory(TopicService topicService,
TbServiceInfoProvider serviceInfoProvider,
TbKafkaSettings kafkaSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType) {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.clientId(jobType.name().toLowerCase() + "-task-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName(jobType.getTasksTopic()))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
}

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.task;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
public interface TaskProducerQueueFactory {
TbQueueProducer<TbProtoQueueMsg<TaskProto>> createTaskProducer(JobType jobType);
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.job;
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.JobId;
import org.thingsboard.server.common.data.id.TenantId;

View File

@ -62,6 +62,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.mobile.MobileAppBundleService;
import org.thingsboard.server.dao.mobile.MobileAppService;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
@ -77,7 +78,6 @@ import org.thingsboard.server.dao.queue.QueueStatsService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.job.JobService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
@ -365,6 +365,8 @@ public interface TbContext {
JobService getJobService();
JobManager getJobManager();
boolean isExternalNodeForceAck();
/**