Refactoring for task processor queue factories

This commit is contained in:
ViacheslavKlimov 2025-05-02 16:23:05 +03:00
parent e59460b42b
commit 4c01b3d70a
21 changed files with 172 additions and 212 deletions

View File

@ -49,6 +49,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@ -419,10 +420,11 @@ public class HashPartitionServiceTest {
}
private HashPartitionService createPartitionService() {
HashPartitionService partitionService = new HashPartitionService(serviceInfoProvider,
routingInfoService,
HashPartitionService partitionService = new HashPartitionService(
applicationEventPublisher,
queueRoutingInfoService,
serviceInfoProvider,
Optional.of(routingInfoService),
Optional.of(queueRoutingInfoService),
topicService);
ReflectionTestUtils.setField(partitionService, "coreTopic", "tb.core");
ReflectionTestUtils.setField(partitionService, "corePartitions", 10);

View File

@ -66,13 +66,13 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
@Value("${service.rule_engine.assigned_tenant_profiles:}")
private Set<UUID> assignedTenantProfiles;
@Autowired
@Autowired(required = false)
private EdqsConfig edqsConfig;
@Autowired
private ApplicationContext applicationContext;
@Autowired
@Autowired(required = false)
private List<TaskProcessor<?, ?>> availableTaskProcessors;
private List<ServiceType> serviceTypes;
@ -102,9 +102,13 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
edqsConfig.setLabel(serviceId);
}
}
taskTypes = availableTaskProcessors.stream()
.map(TaskProcessor::getJobType)
.toList();
if (CollectionsUtil.isNotEmpty(availableTaskProcessors)) {
taskTypes = availableTaskProcessors.stream()
.map(TaskProcessor::getJobType)
.toList();
} else {
taskTypes = Collections.emptyList();
}
generateNewServiceInfoWithCurrentSystemInfo();
}
@ -141,7 +145,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
if (CollectionsUtil.isNotEmpty(assignedTenantProfiles)) {
builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList()));
}
builder.setLabel(edqsConfig.getLabel());
if (edqsConfig != null) {
builder.setLabel(edqsConfig.getLabel());
}
builder.addAllTaskTypes(taskTypes.stream().map(JobType::name).toList());
return serviceInfo = builder.build();
}

View File

@ -19,6 +19,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import jakarta.annotation.PostConstruct;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
@ -49,6 +50,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -63,6 +65,7 @@ import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
@Service
@Slf4j
@RequiredArgsConstructor
public class HashPartitionService implements PartitionService {
@Value("${queue.core.topic:tb_core}")
@ -90,8 +93,8 @@ public class HashPartitionService implements PartitionService {
private final ApplicationEventPublisher applicationEventPublisher;
private final TbServiceInfoProvider serviceInfoProvider;
private final TenantRoutingInfoService tenantRoutingInfoService;
private final QueueRoutingInfoService queueRoutingInfoService;
private final Optional<TenantRoutingInfoService> tenantRoutingInfoService;
private final Optional<QueueRoutingInfoService> queueRoutingInfoService;
private final TopicService topicService;
protected volatile ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
@ -108,18 +111,6 @@ public class HashPartitionService implements PartitionService {
private HashFunction hashFunction;
public HashPartitionService(TbServiceInfoProvider serviceInfoProvider,
TenantRoutingInfoService tenantRoutingInfoService,
ApplicationEventPublisher applicationEventPublisher,
QueueRoutingInfoService queueRoutingInfoService,
TopicService topicService) {
this.serviceInfoProvider = serviceInfoProvider;
this.tenantRoutingInfoService = tenantRoutingInfoService;
this.applicationEventPublisher = applicationEventPublisher;
this.queueRoutingInfoService = queueRoutingInfoService;
this.topicService = topicService;
}
@PostConstruct
public void init() {
this.hashFunction = forName(hashFunctionName);
@ -178,6 +169,10 @@ public class HashPartitionService implements PartitionService {
}
private List<QueueRoutingInfo> getQueueRoutingInfos() {
if (queueRoutingInfoService.isEmpty()) {
return Collections.emptyList();
}
List<QueueRoutingInfo> queueRoutingInfoList;
String serviceType = serviceInfoProvider.getServiceType();
@ -188,7 +183,7 @@ public class HashPartitionService implements PartitionService {
if (getQueuesRetries > 0) {
log.info("Try to get queue routing info.");
try {
queueRoutingInfoList = queueRoutingInfoService.getAllQueuesRoutingInfo();
queueRoutingInfoList = queueRoutingInfoService.get().getAllQueuesRoutingInfo();
break;
} catch (Exception e) {
log.info("Failed to get queues routing info: {}!", e.getMessage());
@ -204,7 +199,7 @@ public class HashPartitionService implements PartitionService {
}
}
} else {
queueRoutingInfoList = queueRoutingInfoService.getAllQueuesRoutingInfo();
queueRoutingInfoList = queueRoutingInfoService.get().getAllQueuesRoutingInfo();
}
return queueRoutingInfoList;
}
@ -638,7 +633,11 @@ public class HashPartitionService implements PartitionService {
}
private TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
return tenantRoutingInfoMap.computeIfAbsent(tenantId, tenantRoutingInfoService::getRoutingInfo);
if (tenantRoutingInfoService.isPresent()) {
return tenantRoutingInfoMap.computeIfAbsent(tenantId, __ -> tenantRoutingInfoService.get().getRoutingInfo(tenantId));
} else {
return new TenantRoutingInfo(tenantId, null, false);
}
}
protected TenantId getIsolatedOrSystemTenantId(ServiceType serviceType, TenantId tenantId) {
@ -702,7 +701,7 @@ public class HashPartitionService implements PartitionService {
if (!responsibleServices.isEmpty()) { // if there are any dedicated servers
TenantProfileId profileId;
if (tenantId != null && !tenantId.isSysTenantId()) {
TenantRoutingInfo routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId);
TenantRoutingInfo routingInfo = tenantRoutingInfoService.get().getRoutingInfo(tenantId);
profileId = routingInfo.getProfileId();
} else {
profileId = null;

View File

@ -265,16 +265,6 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
return new InMemoryTbQueueProducer<>(storage, jobType.getTasksTopic());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TaskProto>> createTaskConsumer(JobType jobType) {
return new InMemoryTbQueueConsumer<>(storage, jobType.getTasksTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return new InMemoryTbQueueProducer<>(storage, "jobs.stats");
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return new InMemoryTbQueueConsumer<>(storage, "jobs.stats");

View File

@ -656,29 +656,6 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(jobType.getTasksTopic()))
.clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders()))
.admin(tasksAdmin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.clientId("job-stats-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName("jobs.stats"))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()

View File

@ -535,29 +535,6 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(jobType.getTasksTopic()))
.clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders()))
.admin(tasksAdmin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.clientId("job-stats-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName("jobs.stats"))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsConsumer() {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()

View File

@ -22,15 +22,12 @@ import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -99,7 +96,6 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueAdmin cfAdmin;
private final TbQueueAdmin cfStateAdmin;
private final TbQueueAdmin edqsEventsAdmin;
private final TbQueueAdmin tasksAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
@ -137,7 +133,6 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs());
this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdqsEventsConfigs());
this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs());
}
@Override
@ -419,29 +414,6 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
throw new UnsupportedOperationException();
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(jobType.getTasksTopic()))
.clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders()))
.admin(tasksAdmin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.clientId("job-stats-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName("jobs.stats"))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
@PreDestroy
private void destroy() {
if (coreAdmin != null) {

View File

@ -47,7 +47,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
* Responsible for initialization of various Producers and Consumers used by TB Core Node.
* Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable
*/
public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory, TaskProcessorQueueFactory {
public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory {
/**
* Used to push messages to instances of TB Transport Service

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.queue.provider;
import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -54,7 +53,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
private TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> toCalculatedFields;
private TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> toCalculatedFieldNotifications;
private TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> jobStatsProducer;
public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) {
this.tbQueueProvider = tbQueueProvider;
@ -75,7 +73,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
this.toEdgeEvents = tbQueueProvider.createEdgeEventMsgProducer();
this.toCalculatedFields = tbQueueProvider.createToCalculatedFieldMsgProducer();
this.toCalculatedFieldNotifications = tbQueueProvider.createToCalculatedFieldNotificationMsgProducer();
this.jobStatsProducer = tbQueueProvider.createJobStatsProducer();
}
@Override
@ -143,9 +140,4 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
return toCalculatedFieldNotifications;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> getJobStatsProducer() {
return jobStatsProducer;
}
}

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.queue.provider;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -98,6 +97,4 @@ public interface TbQueueProducerProvider {
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> getCalculatedFieldsNotificationsMsgProducer();
TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> getJobStatsProducer();
}

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.queue.provider;
import jakarta.annotation.PostConstruct;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -52,7 +51,6 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
private TbQueueProducer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> toEdgeEvents;
private TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> toCalculatedFields;
private TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> toCalculatedFieldNotifications;
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.JobStatsMsg>> jobStatsProducer;
public TbRuleEngineProducerProvider(TbRuleEngineQueueFactory tbQueueProvider) {
this.tbQueueProvider = tbQueueProvider;
@ -72,7 +70,6 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
this.toEdgeEvents = tbQueueProvider.createEdgeEventMsgProducer();
this.toCalculatedFields = tbQueueProvider.createToCalculatedFieldMsgProducer();
this.toCalculatedFieldNotifications = tbQueueProvider.createToCalculatedFieldNotificationMsgProducer();
this.jobStatsProducer = tbQueueProvider.createJobStatsProducer();
}
@Override
@ -140,9 +137,4 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider {
return toCalculatedFieldNotifications;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.JobStatsMsg>> getJobStatsProducer() {
return jobStatsProducer;
}
}

View File

@ -41,7 +41,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
* Responsible for initialization of various Producers and Consumers used by TB Core Node.
* Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable
*/
public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory, TaskProcessorQueueFactory {
public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory {
/**
* Used to push messages to instances of TB Transport Service

View File

@ -122,9 +122,4 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider
throw new RuntimeException("Not Implemented! Should not be used by Transport!");
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.JobStatsMsg>> getJobStatsProducer() {
throw new RuntimeException("Not Implemented! Should not be used by Transport!");
}
}

View File

@ -118,9 +118,4 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider
throw new RuntimeException("Not Implemented! Should not be used by Version Control Service!");
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.JobStatsMsg>> getJobStatsProducer() {
throw new RuntimeException("Not Implemented! Should not be used by Version Control Service!");
}
}

View File

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.task;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryStorage;
import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer;
import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='in-memory'")
@RequiredArgsConstructor
public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFactory {
private final InMemoryStorage storage;
@Override
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) {
return new InMemoryTbQueueConsumer<>(storage, jobType.getTasksTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return new InMemoryTbQueueProducer<>(storage, "jobs.stats");
}
}

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.queue.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@ -29,15 +28,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.TaskResultProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
@Lazy
@Service
@Slf4j
@RequiredArgsConstructor
public class JobStatsService {
private final TbQueueProducerProvider producerProvider;
private final TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> producer;
public JobStatsService(TaskProcessorQueueFactory queueFactory) {
this.producer = queueFactory.createJobStatsProducer();
}
public void reportTaskResult(TenantId tenantId, JobId jobId, TaskResult result) {
report(tenantId, jobId, JobStatsMsg.newBuilder()
@ -59,7 +60,6 @@ public class JobStatsService {
.setJobIdLSB(jobId.getId().getLeastSignificantBits());
TbProtoQueueMsg<JobStatsMsg> msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build());
TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> producer = producerProvider.getJobStatsProducer();
producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, TbQueueCallback.EMPTY);
}

View File

@ -0,0 +1,82 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.task;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TaskProto;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka'")
public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory {
private final TopicService topicService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbKafkaSettings kafkaSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueAdmin tasksAdmin;
public KafkaTaskProcessorQueueFactory(TopicService topicService,
TbServiceInfoProvider serviceInfoProvider,
TbKafkaSettings kafkaSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.serviceInfoProvider = serviceInfoProvider;
this.kafkaSettings = kafkaSettings;
this.topicService = topicService;
this.consumerStatsService = consumerStatsService;
this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs());
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TaskProto>> createTaskConsumer(JobType jobType) {
return TbKafkaConsumerTemplate.<TbProtoQueueMsg<TaskProto>>builder()
.settings(kafkaSettings)
.topic(topicService.buildTopicName(jobType.getTasksTopic()))
.clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId())
.groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group"))
.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders()))
.admin(tasksAdmin)
.statsService(consumerStatsService)
.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<JobStatsMsg>> createJobStatsProducer() {
return TbKafkaProducerTemplate.<TbProtoQueueMsg<JobStatsMsg>>builder()
.clientId("job-stats-producer-" + serviceInfoProvider.getServiceId())
.defaultTopic(topicService.buildTopicName("jobs.stats"))
.settings(kafkaSettings)
.admin(tasksAdmin)
.build();
}
}

View File

@ -37,7 +37,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory;
import java.util.List;
import java.util.Set;

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.provider;
package org.thingsboard.server.queue.task;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg;

View File

@ -1,33 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edqs;
import org.springframework.stereotype.Service;
import org.thingsboard.server.queue.discovery.QueueRoutingInfo;
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService;
import java.util.Collections;
import java.util.List;
@Service
public class DummyQueueRoutingInfoService implements QueueRoutingInfoService {
@Override
public List<QueueRoutingInfo> getAllQueuesRoutingInfo() {
return Collections.emptyList();
}
}

View File

@ -1,30 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edqs;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@Service
public class DummyTenantRoutingInfoService implements TenantRoutingInfoService {
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
return null;
}
}