diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index dace159774..a149ca6dfd 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -49,6 +49,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -419,10 +420,11 @@ public class HashPartitionServiceTest { } private HashPartitionService createPartitionService() { - HashPartitionService partitionService = new HashPartitionService(serviceInfoProvider, - routingInfoService, + HashPartitionService partitionService = new HashPartitionService( applicationEventPublisher, - queueRoutingInfoService, + serviceInfoProvider, + Optional.of(routingInfoService), + Optional.of(queueRoutingInfoService), topicService); ReflectionTestUtils.setField(partitionService, "coreTopic", "tb.core"); ReflectionTestUtils.setField(partitionService, "corePartitions", 10); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 4325651540..f244f99b6b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -66,13 +66,13 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { @Value("${service.rule_engine.assigned_tenant_profiles:}") private Set assignedTenantProfiles; - @Autowired + @Autowired(required = false) private EdqsConfig edqsConfig; @Autowired private ApplicationContext applicationContext; - @Autowired + @Autowired(required = false) private List> availableTaskProcessors; private List serviceTypes; @@ -102,9 +102,13 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { edqsConfig.setLabel(serviceId); } } - taskTypes = availableTaskProcessors.stream() - .map(TaskProcessor::getJobType) - .toList(); + if (CollectionsUtil.isNotEmpty(availableTaskProcessors)) { + taskTypes = availableTaskProcessors.stream() + .map(TaskProcessor::getJobType) + .toList(); + } else { + taskTypes = Collections.emptyList(); + } generateNewServiceInfoWithCurrentSystemInfo(); } @@ -141,7 +145,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { if (CollectionsUtil.isNotEmpty(assignedTenantProfiles)) { builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList())); } - builder.setLabel(edqsConfig.getLabel()); + if (edqsConfig != null) { + builder.setLabel(edqsConfig.getLabel()); + } builder.addAllTaskTypes(taskTypes.stream().map(JobType::name).toList()); return serviceInfo = builder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index ccbfbc9c79..3f81199488 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -19,6 +19,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import jakarta.annotation.PostConstruct; import lombok.Data; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Value; @@ -49,6 +50,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -63,6 +65,7 @@ import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; @Service @Slf4j +@RequiredArgsConstructor public class HashPartitionService implements PartitionService { @Value("${queue.core.topic:tb_core}") @@ -90,8 +93,8 @@ public class HashPartitionService implements PartitionService { private final ApplicationEventPublisher applicationEventPublisher; private final TbServiceInfoProvider serviceInfoProvider; - private final TenantRoutingInfoService tenantRoutingInfoService; - private final QueueRoutingInfoService queueRoutingInfoService; + private final Optional tenantRoutingInfoService; + private final Optional queueRoutingInfoService; private final TopicService topicService; protected volatile ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); @@ -108,18 +111,6 @@ public class HashPartitionService implements PartitionService { private HashFunction hashFunction; - public HashPartitionService(TbServiceInfoProvider serviceInfoProvider, - TenantRoutingInfoService tenantRoutingInfoService, - ApplicationEventPublisher applicationEventPublisher, - QueueRoutingInfoService queueRoutingInfoService, - TopicService topicService) { - this.serviceInfoProvider = serviceInfoProvider; - this.tenantRoutingInfoService = tenantRoutingInfoService; - this.applicationEventPublisher = applicationEventPublisher; - this.queueRoutingInfoService = queueRoutingInfoService; - this.topicService = topicService; - } - @PostConstruct public void init() { this.hashFunction = forName(hashFunctionName); @@ -178,6 +169,10 @@ public class HashPartitionService implements PartitionService { } private List getQueueRoutingInfos() { + if (queueRoutingInfoService.isEmpty()) { + return Collections.emptyList(); + } + List queueRoutingInfoList; String serviceType = serviceInfoProvider.getServiceType(); @@ -188,7 +183,7 @@ public class HashPartitionService implements PartitionService { if (getQueuesRetries > 0) { log.info("Try to get queue routing info."); try { - queueRoutingInfoList = queueRoutingInfoService.getAllQueuesRoutingInfo(); + queueRoutingInfoList = queueRoutingInfoService.get().getAllQueuesRoutingInfo(); break; } catch (Exception e) { log.info("Failed to get queues routing info: {}!", e.getMessage()); @@ -204,7 +199,7 @@ public class HashPartitionService implements PartitionService { } } } else { - queueRoutingInfoList = queueRoutingInfoService.getAllQueuesRoutingInfo(); + queueRoutingInfoList = queueRoutingInfoService.get().getAllQueuesRoutingInfo(); } return queueRoutingInfoList; } @@ -638,7 +633,11 @@ public class HashPartitionService implements PartitionService { } private TenantRoutingInfo getRoutingInfo(TenantId tenantId) { - return tenantRoutingInfoMap.computeIfAbsent(tenantId, tenantRoutingInfoService::getRoutingInfo); + if (tenantRoutingInfoService.isPresent()) { + return tenantRoutingInfoMap.computeIfAbsent(tenantId, __ -> tenantRoutingInfoService.get().getRoutingInfo(tenantId)); + } else { + return new TenantRoutingInfo(tenantId, null, false); + } } protected TenantId getIsolatedOrSystemTenantId(ServiceType serviceType, TenantId tenantId) { @@ -702,7 +701,7 @@ public class HashPartitionService implements PartitionService { if (!responsibleServices.isEmpty()) { // if there are any dedicated servers TenantProfileId profileId; if (tenantId != null && !tenantId.isSysTenantId()) { - TenantRoutingInfo routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId); + TenantRoutingInfo routingInfo = tenantRoutingInfoService.get().getRoutingInfo(tenantId); profileId = routingInfo.getProfileId(); } else { profileId = null; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index 9160818278..a164237366 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -265,16 +265,6 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE return new InMemoryTbQueueProducer<>(storage, jobType.getTasksTopic()); } - @Override - public TbQueueConsumer> createTaskConsumer(JobType jobType) { - return new InMemoryTbQueueConsumer<>(storage, jobType.getTasksTopic()); - } - - @Override - public TbQueueProducer> createJobStatsProducer() { - return new InMemoryTbQueueProducer<>(storage, "jobs.stats"); - } - @Override public TbQueueConsumer> createJobStatsConsumer() { return new InMemoryTbQueueConsumer<>(storage, "jobs.stats"); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index e4b51eaa1f..b15e60f09d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -656,29 +656,6 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi .build(); } - @Override - public TbQueueConsumer> createTaskConsumer(JobType jobType) { - return TbKafkaConsumerTemplate.>builder() - .settings(kafkaSettings) - .topic(topicService.buildTopicName(jobType.getTasksTopic())) - .clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId()) - .groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders())) - .admin(tasksAdmin) - .statsService(consumerStatsService) - .build(); - } - - @Override - public TbQueueProducer> createJobStatsProducer() { - return TbKafkaProducerTemplate.>builder() - .clientId("job-stats-producer-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName("jobs.stats")) - .settings(kafkaSettings) - .admin(tasksAdmin) - .build(); - } - @Override public TbQueueConsumer> createJobStatsConsumer() { return TbKafkaConsumerTemplate.>builder() diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 85d1e8be14..0bf6acd830 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -535,29 +535,6 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { .build(); } - @Override - public TbQueueConsumer> createTaskConsumer(JobType jobType) { - return TbKafkaConsumerTemplate.>builder() - .settings(kafkaSettings) - .topic(topicService.buildTopicName(jobType.getTasksTopic())) - .clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId()) - .groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders())) - .admin(tasksAdmin) - .statsService(consumerStatsService) - .build(); - } - - @Override - public TbQueueProducer> createJobStatsProducer() { - return TbKafkaProducerTemplate.>builder() - .clientId("job-stats-producer-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName("jobs.stats")) - .settings(kafkaSettings) - .admin(tasksAdmin) - .build(); - } - @Override public TbQueueConsumer> createJobStatsConsumer() { return TbKafkaConsumerTemplate.>builder() diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index b2af3671c6..3b67ea4f9f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -22,15 +22,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg; -import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -99,7 +96,6 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin cfAdmin; private final TbQueueAdmin cfStateAdmin; private final TbQueueAdmin edqsEventsAdmin; - private final TbQueueAdmin tasksAdmin; private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -137,7 +133,6 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs()); this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs()); this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdqsEventsConfigs()); - this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs()); } @Override @@ -419,29 +414,6 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { throw new UnsupportedOperationException(); } - @Override - public TbQueueConsumer> createTaskConsumer(JobType jobType) { - return TbKafkaConsumerTemplate.>builder() - .settings(kafkaSettings) - .topic(topicService.buildTopicName(jobType.getTasksTopic())) - .clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId()) - .groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group")) - .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders())) - .admin(tasksAdmin) - .statsService(consumerStatsService) - .build(); - } - - @Override - public TbQueueProducer> createJobStatsProducer() { - return TbKafkaProducerTemplate.>builder() - .clientId("job-stats-producer-" + serviceInfoProvider.getServiceId()) - .defaultTopic(topicService.buildTopicName("jobs.stats")) - .settings(kafkaSettings) - .admin(tasksAdmin) - .build(); - } - @PreDestroy private void destroy() { if (coreAdmin != null) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java index 823ebea298..e47354941c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java @@ -47,7 +47,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; * Responsible for initialization of various Producers and Consumers used by TB Core Node. * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable */ -public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory, TaskProcessorQueueFactory { +public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory { /** * Used to push messages to instances of TB Transport Service diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java index 9900474a10..98a3d78304 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueProducerProvider.java @@ -18,7 +18,6 @@ package org.thingsboard.server.queue.provider; import jakarta.annotation.PostConstruct; import org.springframework.stereotype.Service; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -54,7 +53,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { private TbQueueProducer> toHousekeeper; private TbQueueProducer> toCalculatedFields; private TbQueueProducer> toCalculatedFieldNotifications; - private TbQueueProducer> jobStatsProducer; public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) { this.tbQueueProvider = tbQueueProvider; @@ -75,7 +73,6 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { this.toEdgeEvents = tbQueueProvider.createEdgeEventMsgProducer(); this.toCalculatedFields = tbQueueProvider.createToCalculatedFieldMsgProducer(); this.toCalculatedFieldNotifications = tbQueueProvider.createToCalculatedFieldNotificationMsgProducer(); - this.jobStatsProducer = tbQueueProvider.createJobStatsProducer(); } @Override @@ -143,9 +140,4 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider { return toCalculatedFieldNotifications; } - @Override - public TbQueueProducer> getJobStatsProducer() { - return jobStatsProducer; - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java index 428e673fa8..865637b2ff 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbQueueProducerProvider.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.queue.provider; -import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -98,6 +97,4 @@ public interface TbQueueProducerProvider { TbQueueProducer> getCalculatedFieldsNotificationsMsgProducer(); - TbQueueProducer> getJobStatsProducer(); - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java index 9e77a2d4e7..8e1952fc14 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineProducerProvider.java @@ -18,7 +18,6 @@ package org.thingsboard.server.queue.provider; import jakarta.annotation.PostConstruct; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -52,7 +51,6 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { private TbQueueProducer> toEdgeEvents; private TbQueueProducer> toCalculatedFields; private TbQueueProducer> toCalculatedFieldNotifications; - private TbQueueProducer> jobStatsProducer; public TbRuleEngineProducerProvider(TbRuleEngineQueueFactory tbQueueProvider) { this.tbQueueProvider = tbQueueProvider; @@ -72,7 +70,6 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { this.toEdgeEvents = tbQueueProvider.createEdgeEventMsgProducer(); this.toCalculatedFields = tbQueueProvider.createToCalculatedFieldMsgProducer(); this.toCalculatedFieldNotifications = tbQueueProvider.createToCalculatedFieldNotificationMsgProducer(); - this.jobStatsProducer = tbQueueProvider.createJobStatsProducer(); } @Override @@ -140,9 +137,4 @@ public class TbRuleEngineProducerProvider implements TbQueueProducerProvider { return toCalculatedFieldNotifications; } - @Override - public TbQueueProducer> getJobStatsProducer() { - return jobStatsProducer; - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java index 83c467c992..18bb6db14a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbRuleEngineQueueFactory.java @@ -41,7 +41,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; * Responsible for initialization of various Producers and Consumers used by TB Core Node. * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable */ -public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory, TaskProcessorQueueFactory { +public interface TbRuleEngineQueueFactory extends TbUsageStatsClientQueueFactory, HousekeeperClientQueueFactory, EdqsClientQueueFactory { /** * Used to push messages to instances of TB Transport Service diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java index 4472c6157e..cb7e6dd1f4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java @@ -122,9 +122,4 @@ public class TbTransportQueueProducerProvider implements TbQueueProducerProvider throw new RuntimeException("Not Implemented! Should not be used by Transport!"); } - @Override - public TbQueueProducer> getJobStatsProducer() { - throw new RuntimeException("Not Implemented! Should not be used by Transport!"); - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java index 0370f8a4af..85c400d094 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbVersionControlProducerProvider.java @@ -118,9 +118,4 @@ public class TbVersionControlProducerProvider implements TbQueueProducerProvider throw new RuntimeException("Not Implemented! Should not be used by Version Control Service!"); } - @Override - public TbQueueProducer> getJobStatsProducer() { - throw new RuntimeException("Not Implemented! Should not be used by Version Control Service!"); - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java new file mode 100644 index 0000000000..dbe302dfee --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/InMemoryTaskProcessorQueueFactory.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.task; + +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.memory.InMemoryStorage; +import org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer; +import org.thingsboard.server.queue.memory.InMemoryTbQueueProducer; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='in-memory'") +@RequiredArgsConstructor +public class InMemoryTaskProcessorQueueFactory implements TaskProcessorQueueFactory { + + private final InMemoryStorage storage; + + @Override + public TbQueueConsumer> createTaskConsumer(JobType jobType) { + return new InMemoryTbQueueConsumer<>(storage, jobType.getTasksTopic()); + } + + @Override + public TbQueueProducer> createJobStatsProducer() { + return new InMemoryTbQueueProducer<>(storage, "jobs.stats"); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java index c3780f15e7..28d08f593c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/JobStatsService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.queue.task; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -29,15 +28,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.TaskResultProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; @Lazy @Service @Slf4j -@RequiredArgsConstructor public class JobStatsService { - private final TbQueueProducerProvider producerProvider; + private final TbQueueProducer> producer; + + public JobStatsService(TaskProcessorQueueFactory queueFactory) { + this.producer = queueFactory.createJobStatsProducer(); + } public void reportTaskResult(TenantId tenantId, JobId jobId, TaskResult result) { report(tenantId, jobId, JobStatsMsg.newBuilder() @@ -59,7 +60,6 @@ public class JobStatsService { .setJobIdLSB(jobId.getId().getLeastSignificantBits()); TbProtoQueueMsg msg = new TbProtoQueueMsg<>(jobId.getId(), statsMsg.build()); - TbQueueProducer> producer = producerProvider.getJobStatsProducer(); producer.send(TopicPartitionInfo.builder().topic(producer.getDefaultTopic()).build(), msg, TbQueueCallback.EMPTY); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java new file mode 100644 index 0000000000..77a47a20c8 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/KafkaTaskProcessorQueueFactory.java @@ -0,0 +1,82 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.task; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TaskProto; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.TbQueueConsumer; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicService; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; +import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; +import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate; +import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; + +@Component +@ConditionalOnExpression("'${queue.type:null}'=='kafka'") +public class KafkaTaskProcessorQueueFactory implements TaskProcessorQueueFactory { + + private final TopicService topicService; + private final TbServiceInfoProvider serviceInfoProvider; + private final TbKafkaSettings kafkaSettings; + private final TbKafkaConsumerStatsService consumerStatsService; + + private final TbQueueAdmin tasksAdmin; + + public KafkaTaskProcessorQueueFactory(TopicService topicService, + TbServiceInfoProvider serviceInfoProvider, + TbKafkaSettings kafkaSettings, + TbKafkaConsumerStatsService consumerStatsService, + TbKafkaTopicConfigs kafkaTopicConfigs) { + this.serviceInfoProvider = serviceInfoProvider; + this.kafkaSettings = kafkaSettings; + this.topicService = topicService; + this.consumerStatsService = consumerStatsService; + this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs()); + } + + @Override + public TbQueueConsumer> createTaskConsumer(JobType jobType) { + return TbKafkaConsumerTemplate.>builder() + .settings(kafkaSettings) + .topic(topicService.buildTopicName(jobType.getTasksTopic())) + .clientId(jobType.name().toLowerCase() + "-task-consumer-" + serviceInfoProvider.getServiceId()) + .groupId(topicService.buildTopicName(jobType.name().toLowerCase() + "-task-consumer-group")) + .decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), TaskProto.parseFrom(msg.getData()), msg.getHeaders())) + .admin(tasksAdmin) + .statsService(consumerStatsService) + .build(); + } + + @Override + public TbQueueProducer> createJobStatsProducer() { + return TbKafkaProducerTemplate.>builder() + .clientId("job-stats-producer-" + serviceInfoProvider.getServiceId()) + .defaultTopic(topicService.buildTopicName("jobs.stats")) + .settings(kafkaSettings) + .admin(tasksAdmin) + .build(); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index 82f13ce66b..319ddf4ab9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -37,7 +37,6 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; -import org.thingsboard.server.queue.provider.TaskProcessorQueueFactory; import java.util.List; import java.util.Set; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TaskProcessorQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorQueueFactory.java similarity index 96% rename from common/queue/src/main/java/org/thingsboard/server/queue/provider/TaskProcessorQueueFactory.java rename to common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorQueueFactory.java index 571b14639c..c5e8035d74 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TaskProcessorQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessorQueueFactory.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.queue.provider; +package org.thingsboard.server.queue.task; import org.thingsboard.server.common.data.job.JobType; import org.thingsboard.server.gen.transport.TransportProtos.JobStatsMsg; diff --git a/edqs/src/main/java/org/thingsboard/server/edqs/DummyQueueRoutingInfoService.java b/edqs/src/main/java/org/thingsboard/server/edqs/DummyQueueRoutingInfoService.java deleted file mode 100644 index 1f1152af68..0000000000 --- a/edqs/src/main/java/org/thingsboard/server/edqs/DummyQueueRoutingInfoService.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.edqs; - -import org.springframework.stereotype.Service; -import org.thingsboard.server.queue.discovery.QueueRoutingInfo; -import org.thingsboard.server.queue.discovery.QueueRoutingInfoService; - -import java.util.Collections; -import java.util.List; - -@Service -public class DummyQueueRoutingInfoService implements QueueRoutingInfoService { - - @Override - public List getAllQueuesRoutingInfo() { - return Collections.emptyList(); - } - -} diff --git a/edqs/src/main/java/org/thingsboard/server/edqs/DummyTenantRoutingInfoService.java b/edqs/src/main/java/org/thingsboard/server/edqs/DummyTenantRoutingInfoService.java deleted file mode 100644 index 4e16e5e16a..0000000000 --- a/edqs/src/main/java/org/thingsboard/server/edqs/DummyTenantRoutingInfoService.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.edqs; - -import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.queue.discovery.TenantRoutingInfo; -import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; - -@Service -public class DummyTenantRoutingInfoService implements TenantRoutingInfoService { - @Override - public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { - return null; - } - -}