diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 21459814b7..46ab8f25de 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1696,14 +1696,14 @@ queue: print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}" edqs: sync: - # Enable/disable EDQS synchronization FIXME: disable by default before release - enabled: "${TB_EDQS_SYNC_ENABLED:true}" + # Enable/disable EDQS synchronization + enabled: "${TB_EDQS_SYNC_ENABLED:false}" # Batch size of entities being synced with EDQS entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}" # Batch size of timeseries data being synced with EDQS ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}" - # Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation) FIXME: disable by default before release - api_enabled: "${TB_EDQS_API_ENABLED:true}" + # Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation) + api_enabled: "${TB_EDQS_API_ENABLED:false}" # Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices) mode: "${TB_EDQS_MODE:local}" local: @@ -1727,7 +1727,7 @@ queue: # Enable/disable statistics for EDQS enabled: "${TB_EDQS_STATS_ENABLED:true}" # Statistics printing interval for EDQS - print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:60000}" + print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}" vc: # Default topic name diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index efc290d3c5..e4948330a5 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.edqs.state.EdqsStateService; import org.thingsboard.server.edqs.util.EdqsRocksDb; import java.util.concurrent.TimeUnit; @@ -34,7 +35,7 @@ import static org.awaitility.Awaitility.await; @DaoSqlTest @TestPropertySource(properties = { // "queue.type=kafka", // uncomment to use Kafka -// "queue.kafka.bootstrap.servers=192.168.0.105:9092", +// "queue.kafka.bootstrap.servers=10.7.1.254:9092", "queue.edqs.sync.enabled=true", "queue.edqs.api_enabled=true", "queue.edqs.mode=local" @@ -44,12 +45,15 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { @Autowired private EdqsService edqsService; + @Autowired(required = false) + private EdqsStateService edqsStateService; + @MockBean // so that we don't do backup for tests private EdqsRocksDb edqsRocksDb; @Before public void before() { - await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled()); + await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled() && edqsStateService.isReady()); } @Override diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 42e00be7fc..9349946a80 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -24,9 +24,7 @@ import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Lazy; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ExceptionUtil; @@ -90,8 +88,7 @@ public class EdqsProcessor implements TbQueueHandler, private final EdqsConfig config; private final EdqsPartitionService partitionService; private final ConfigurableApplicationContext applicationContext; - @Autowired @Lazy - private EdqsStateService stateService; + private final EdqsStateService stateService; private PartitionedQueueConsumerManager> eventConsumer; private TbQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java index 043c0bd6c3..9003eb4da2 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/repo/TenantRepo.java @@ -105,7 +105,7 @@ public class TenantRepo { public void processEvent(EdqsEvent event) { EdqsObject edqsObject = event.getObject(); - log.debug("[{}] Processing event: {}", tenantId, event); + log.trace("[{}] Processing event: {}", tenantId, event); if (event.getEventType() == EdqsEventType.UPDATED) { addOrUpdate(edqsObject); } else if (event.getEventType() == EdqsEventType.DELETED) { diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java index 1966618d4b..bb0aa67239 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/EdqsStateService.java @@ -33,6 +33,8 @@ public interface EdqsStateService { void save(TenantId tenantId, ObjectType type, String key, EdqsEventType eventType, ToEdqsMsg msg); + boolean isReady(); + void stop(); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index f5ba2ba841..05eebb188c 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -17,6 +17,8 @@ package org.thingsboard.server.edqs.state; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; @@ -52,8 +54,9 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsConfig config; private final EdqsPartitionService partitionService; private final EdqsQueueFactory queueFactory; - private final EdqsProcessor edqsProcessor; private final TopicService topicService; + @Autowired @Lazy + private EdqsProcessor edqsProcessor; private PartitionedQueueConsumerManager> stateConsumer; private QueueStateService, TbProtoQueueMsg> queueStateService; @@ -63,6 +66,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final VersionsStore versionsStore = new VersionsStore(); private final AtomicInteger stateReadCount = new AtomicInteger(); private final AtomicInteger eventsReadCount = new AtomicInteger(); + private Boolean ready; @Override public void init(PartitionedQueueConsumerManager> eventConsumer) { @@ -72,9 +76,6 @@ public class KafkaEdqsStateService implements EdqsStateService { .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer, config) -> { for (TbProtoQueueMsg queueMsg : msgs) { - if (consumer.isStopped()) { - return; - } try { ToEdqsMsg msg = queueMsg.getValue(); edqsProcessor.process(msg, EdqsQueue.STATE); @@ -124,7 +125,7 @@ public class KafkaEdqsStateService implements EdqsStateService { TenantId tenantId = getTenantId(msg); ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); - log.debug("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); + log.trace("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); stateProducer.send(tenantId, objectType, key, msg); } } catch (Throwable t) { @@ -160,6 +161,18 @@ public class KafkaEdqsStateService implements EdqsStateService { // do nothing here, backup is done by events consumer } + @Override + public boolean isReady() { + if (ready == null) { + Set partitionsInProgress = queueStateService.getPartitionsInProgress(); + if (partitionsInProgress != null && partitionsInProgress.isEmpty()) { + ready = true; // once true - always true, not to change readiness status on each repartitioning + } + } + log.error("ready: {}", ready); + return ready != null && ready; + } + private TenantId getTenantId(ToEdqsMsg edqsMsg) { return TenantId.fromUUID(new UUID(edqsMsg.getTenantIdMSB(), edqsMsg.getTenantIdLSB())); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java index 6869f14b74..c1620e5ec7 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/LocalEdqsStateService.java @@ -80,6 +80,11 @@ public class LocalEdqsStateService implements EdqsStateService { } } + @Override + public boolean isReady() { + return partitions != null; + } + @Override public void stop() { } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java index 9619d086ad..d28334c5d9 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/stats/EdqsStatsService.java @@ -43,9 +43,12 @@ public class EdqsStatsService { private final ConcurrentHashMap statsMap = new ConcurrentHashMap<>(); private final StatsFactory statsFactory; - @Scheduled(initialDelayString = "${queue.edqs.stats.print-interval-ms:60000}", - fixedDelayString = "${queue.edqs.stats.print-interval-ms:60000}") + @Scheduled(initialDelayString = "${queue.edqs.stats.print-interval-ms:300000}", + fixedDelayString = "${queue.edqs.stats.print-interval-ms:300000}") private void reportStats() { + if (statsMap.isEmpty()) { + return; + } String values = statsMap.entrySet().stream() .map(kv -> "TenantId [" + kv.getKey() + "] stats [" + kv.getValue() + "]") .collect(Collectors.joining(System.lineSeparator())); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java index 3bb489f209..da7f16ec0b 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TopicPartitionInfo.java @@ -17,13 +17,11 @@ package org.thingsboard.server.common.msg.queue; import lombok.Builder; import lombok.Getter; -import lombok.ToString; import org.thingsboard.server.common.data.id.TenantId; import java.util.Objects; import java.util.Optional; -@ToString public class TopicPartitionInfo { private final String topic; @@ -97,4 +95,13 @@ public class TopicPartitionInfo { return Objects.hash(fullTopicName, partition); } + @Override + public String toString() { + String str = fullTopicName; + if (useInternalPartition) { + str += "[" + partition + "]"; + } + return str; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java index df683b94cd..a73ee17bd2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/MainQueueConsumerManager.java @@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.stream.Collectors; @Slf4j public class MainQueueConsumerManager { @@ -270,12 +269,6 @@ public class MainQueueConsumerManager partitions) { - return partitions.stream().map(tpi -> tpi.getFullTopicName() + (tpi.isUseInternalPartition() ? - "[" + tpi.getPartition().orElse(-1) + "]" : "")) - .collect(Collectors.joining(", ", "[", "]")); - } - public interface MsgPackProcessor { void process(List msgs, TbQueueConsumer consumer, C config) throws Exception; } @@ -299,7 +292,7 @@ public class MainQueueConsumerManager removedPartitions = new HashSet<>(consumers.keySet()); removedPartitions.removeAll(partitions); - log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions)); + log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, addedPartitions, removedPartitions); removePartitions(removedPartitions); addPartitions(addedPartitions, null); } @@ -333,7 +326,7 @@ public class MainQueueConsumerManager partitions) { - log.info("[{}] New partitions: {}", queueKey, partitionsToString(partitions)); + log.info("[{}] New partitions: {}", queueKey, partitions); if (partitions.isEmpty()) { if (consumer != null && consumer.isRunning()) { consumer.initiateStop(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java index 1b47d4c073..6ab0668f30 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/PartitionedQueueConsumerManager.java @@ -52,10 +52,10 @@ public class PartitionedQueueConsumerManager extends MainQ @Override protected void processTask(TbQueueConsumerManagerTask task) { if (task instanceof AddPartitionsTask addPartitionsTask) { - log.info("[{}] Added partitions: {}", queueKey, partitionsToString(addPartitionsTask.partitions())); + log.info("[{}] Added partitions: {}", queueKey, addPartitionsTask.partitions()); consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop()); } else if (task instanceof RemovePartitionsTask removePartitionsTask) { - log.info("[{}] Removed partitions: {}", queueKey, partitionsToString(removePartitionsTask.partitions())); + log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions()); consumerWrapper.removePartitions(removePartitionsTask.partitions()); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java index d022ad14de..00015457c1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/consumer/QueueStateService.java @@ -16,16 +16,19 @@ package org.thingsboard.server.queue.common.consumer; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.TbQueueMsg; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +@Slf4j public class QueueStateService { private PartitionedQueueConsumerManager stateConsumer; @@ -33,6 +36,9 @@ public class QueueStateService { @Getter private Set partitions; + private final Set partitionsInProgress = ConcurrentHashMap.newKeySet(); + private boolean initialized; + private final Lock lock = new ReentrantLock(); public void init(PartitionedQueueConsumerManager stateConsumer, PartitionedQueueConsumerManager eventConsumer) { @@ -62,9 +68,15 @@ public class QueueStateService { } if (!addedPartitions.isEmpty()) { + partitionsInProgress.addAll(addedPartitions); stateConsumer.addPartitions(addedPartitions, partition -> { lock.lock(); try { + partitionsInProgress.remove(partition); + log.info("Finished partition {} (still in progress: {})", partition, partitionsInProgress); + if (partitionsInProgress.isEmpty()) { + log.info("All partitions processed"); + } if (this.partitions.contains(partition)) { eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic()))); } @@ -73,10 +85,15 @@ public class QueueStateService { } }); } + initialized = true; } private Set withTopic(Set partitions, String topic) { return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet()); } + public Set getPartitionsInProgress() { + return initialized ? partitionsInProgress : null; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index ea9ba3f245..5787516b25 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -59,7 +59,7 @@ import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; @Slf4j public class HashPartitionService implements PartitionService { - @Value("${queue.core.topic}") + @Value("${queue.core.topic:tb_core}") private String coreTopic; @Value("${queue.core.partitions:10}") private Integer corePartitions; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java index 27a1e09f79..ba113495d1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java @@ -21,7 +21,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.RUNTIME) -// TODO: tb-core ? @ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " + "(('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && " + "'${queue.edqs.mode:null}'=='local'))") diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 47c776c742..9b580c762e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -54,6 +54,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue private final boolean readFromBeginning; // reset offset to beginning private final boolean stopWhenRead; // stop consuming when reached an empty msg pack + private int readCount; private Map endOffsets; // needed if stopWhenRead is true private boolean partitionsAssigned = false; @@ -152,6 +153,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue records.forEach(record -> { recordList.add(record); if (stopWhenRead) { + readCount++; int partition = record.partition(); Long endOffset = endOffsets.get(partition); if (endOffset == null) { @@ -166,7 +168,7 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue }); } if (stopWhenRead && endOffsets.isEmpty()) { - log.info("Reached end offset for {}, stopping consumer", consumer.assignment()); + log.info("Finished reading {}, processed {} messages", partitions, readCount); stop(); } return recordList; diff --git a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java index 2fc7970fc9..e1940e55fe 100644 --- a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java +++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java @@ -38,7 +38,7 @@ public class DefaultStatsFactory implements StatsFactory { private static final Counter STUB_COUNTER = new StubCounter(); - @Autowired(required = false) // FIXME Slavik !!! + @Autowired // FIXME Slavik !!! private MeterRegistry meterRegistry; @Value("${metrics.enabled:false}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java index 3f941b776c..1a0f3be1e7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java @@ -142,12 +142,12 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe private EdqsResponse processEdqsRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) { EdqsResponse response; try { - log.info("Sending request to EDQS: {}", request); + log.debug("[{}] Sending request to EDQS: {}", tenantId, request); response = edqsService.processRequest(tenantId, customerId, request).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } - log.info("Received response from EDQS: {}", response); + log.debug("[{}] Received response from EDQS: {}", tenantId, response); if (response.getError() != null) { throw new RuntimeException(response.getError()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java index b69aa52ec5..9586cb62f8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java @@ -16,7 +16,7 @@ package org.thingsboard.server.dao.sql.query; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsObject; @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.edqs.EdqsService; @Service -@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "false", matchIfMissing = true) +@ConditionalOnMissingBean(value = EdqsService.class, ignored = DummyEdqsService.class) public class DummyEdqsService implements EdqsService { @Override diff --git a/edqs/src/main/java/org/thingsboard/server/edqs/EdqsController.java b/edqs/src/main/java/org/thingsboard/server/edqs/EdqsController.java new file mode 100644 index 0000000000..f2e686a84c --- /dev/null +++ b/edqs/src/main/java/org/thingsboard/server/edqs/EdqsController.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.edqs; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.thingsboard.server.edqs.state.EdqsStateService; + +@RestController +@RequiredArgsConstructor +@RequestMapping("/api/edqs") +public class EdqsController { + + private final EdqsStateService edqsStateService; + + @GetMapping("/ready") + public ResponseEntity isReady() { + if (edqsStateService.isReady()) { + return ResponseEntity.ok().build(); + } else { + return ResponseEntity.badRequest().build(); + } + } + +} diff --git a/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java b/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java index 044020023d..2f5185728a 100644 --- a/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java +++ b/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java @@ -18,6 +18,7 @@ package org.thingsboard.server.edqs; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @@ -27,6 +28,7 @@ import java.util.Arrays; @SpringBootConfiguration @EnableAsync @EnableScheduling +@EnableAutoConfiguration @ComponentScan({"org.thingsboard.server.edqs", "org.thingsboard.server.queue.edqs", "org.thingsboard.server.queue.discovery", "org.thingsboard.server.queue.kafka", "org.thingsboard.server.queue.settings", "org.thingsboard.server.queue.environment", "org.thingsboard.server.common.stats"}) @Slf4j @@ -39,71 +41,6 @@ public class ThingsboardEdqsApplication { SpringApplication.run(ThingsboardEdqsApplication.class, updateArguments(args)); } - // @Bean -// public ApplicationRunner runner(CSVLoader loader, EdqRepository edqRepository) { -// return args -> { -// long startTs = System.currentTimeMillis(); -// var loader = new TenantRepoLoader(new TenantRepo(TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60")))); -// loader.load(); -// log.info("Loaded all in {} ms", System.currentTimeMillis() - startTs); - - - -// log.info("Compressed {} strings/json, Before: {}, After: {}", -// CompressedStringDataPoint.cnt.get(), -// CompressedStringDataPoint.uncompressedLength.get(), -// CompressedStringDataPoint.compressedLength.get()); -// -// log.info("Deduplicated {} short and {} long strings", -// TbStringPool.size(), TbBytePool.size()); -// -// var tenantId = TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60")); -// var customerId = new CustomerId(UUID.fromString("fcbf2f50-d0d9-11ea-bea3-177755191a6e")); -// System.gc(); -// -// while (true) { -// EntityTypeFilter filter = new EntityTypeFilter(); -// filter.setEntityType(EntityType.DEVICE); -// var pageLink = new EntityDataPageLink(20, 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.TIME_SERIES, "state"), EntityDataSortOrder.Direction.DESC), false); -// -// var entityFields = Arrays.asList(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime")); -// var latestValues = Arrays.asList(new EntityKey(EntityKeyType.TIME_SERIES, "state")); -// KeyFilter nameFilter = new KeyFilter(); -// nameFilter.setKey(new EntityKey(EntityKeyType.ENTITY_FIELD, "name")); -// var predicate = new StringFilterPredicate(); -// predicate.setIgnoreCase(false); -// predicate.setOperation(StringFilterPredicate.StringOperation.CONTAINS); -// predicate.setValue(new FilterPredicateValue<>("LoRa-")); -// nameFilter.setPredicate(predicate); -// nameFilter.setValueType(EntityKeyValueType.STRING); -// -// EntityDataQuery edq = new EntityDataQuery(filter, pageLink, entityFields, latestValues, Arrays.asList(nameFilter)); -// var result = edqRepository.findEntityDataByQuery(tenantId, customerId, RepositoryUtils.ALL_READ_PERMISSIONS, edq, false); -// log.info("Device count: {}", result.getTotalElements()); -// log.info("First: {}", result.getData().get(0).getEntityId()); -// log.info("Last: {}", result.getData().get(19).getEntityId()); -// -// pageLink.setSortOrder(new EntityDataSortOrder(new EntityKey(EntityKeyType.TIME_SERIES, "state"), EntityDataSortOrder.Direction.ASC)); -// result = edqRepository.findEntityDataByQuery(tenantId, customerId, RepositoryUtils.ALL_READ_PERMISSIONS, edq, false); -// log.info("Device count: {}", result.getTotalElements()); -// log.info("First: {}", result.getData().get(0).getEntityId()); -// log.info("Last: {}", result.getData().get(19).getEntityId()); -// -// result.getData().forEach(data -> { -// System.err.println(data.getEntityId() + ":"); -// data.getLatest().forEach((type, values) -> { -// System.err.println(type); -// values.forEach((key, tsValue) -> { -// System.err.println(key + " = " + tsValue.getValue()); -// }); -// }); -// System.err.println(); -// }); -// Thread.sleep(5000); -// } -// }; -// } - private static String[] updateArguments(String[] args) { if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) { String[] modifiedArgs = new String[args.length + 1]; diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index fa09da1cdd..3d30c5c6fa 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -14,6 +14,12 @@ # limitations under the License. # +server: + # Server bind-address + address: "${HTTP_BIND_ADDRESS:0.0.0.0}" + # Server bind port + port: "${HTTP_BIND_PORT:8080}" + # Application info parameters app: # Application version @@ -38,10 +44,9 @@ zk: # The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability. recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}" -# SQL DAO Configuration parameters spring: main: - web-application-type: "none" + allow-circular-references: "true" # Spring Boot configuration property that controls whether circular dependencies between beans are allowed. # Queue configuration parameters queue: @@ -66,7 +71,7 @@ queue: # Enable/disable statistics for EDQS enabled: "${TB_EDQS_STATS_ENABLED:true}" # Statistics printing interval for EDQS - print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:60000}" + print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}" kafka: # Kafka Bootstrap nodes in "host:port" format @@ -137,9 +142,9 @@ queue: - key: max.poll.interval.ms # Example of specific consumer properties value per topic for VC value: "${TB_QUEUE_KAFKA_VC_MAX_POLL_INTERVAL_MS:600000}" - # tb_rule_engine.sq: - # - key: max.poll.records - # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" + # tb_rule_engine.sq: + # - key: max.poll.records + # value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}" tb_housekeeper: # Consumer properties for Housekeeper tasks topic - key: max.poll.records @@ -209,133 +214,6 @@ queue: request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}" # Interval in milliseconds to poll api response from transport microservices response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" - core: - # Default topic name of Kafka, RabbitMQ, etc. queue - topic: "${TB_QUEUE_CORE_TOPIC:tb_core}" - # Interval in milliseconds to poll messages by Core microservices - poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" - # Amount of partitions used by Core microservices - partitions: "${TB_QUEUE_CORE_PARTITIONS:10}" - # Timeout for processing a message pack by Core microservices - pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}" - # Enable/disable a separate consumer per partition for Core queue - consumer-per-partition: "${TB_QUEUE_CORE_CONSUMER_PER_PARTITION:true}" - ota: - # Default topic name for OTA updates - topic: "${TB_QUEUE_CORE_OTA_TOPIC:tb_ota_package}" - # The interval of processing the OTA updates for devices. Used to avoid any harm to the network due to many parallel OTA updates - pack-interval-ms: "${TB_QUEUE_CORE_OTA_PACK_INTERVAL_MS:60000}" - # The size of OTA updates notifications fetched from the queue. The queue stores pairs of firmware and device ids - pack-size: "${TB_QUEUE_CORE_OTA_PACK_SIZE:100}" - # Stats topic name for queue Kafka, RabbitMQ, etc. - usage-stats-topic: "${TB_QUEUE_US_TOPIC:tb_usage_stats}" - stats: - # Enable/disable statistics for Core microservices - enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}" - # Statistics printing interval for Core microservices - print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}" - housekeeper: - # Topic name for Housekeeper tasks - topic: "${TB_HOUSEKEEPER_TOPIC:tb_housekeeper}" - # Topic name for Housekeeper tasks to be reprocessed - reprocessing-topic: "${TB_HOUSEKEEPER_REPROCESSING_TOPIC:tb_housekeeper.reprocessing}" - # Poll interval for topics related to Housekeeper - poll-interval-ms: "${TB_HOUSEKEEPER_POLL_INTERVAL_MS:500}" - # Timeout in milliseconds for task processing. Tasks that fail to finish on time will be submitted for reprocessing - task-processing-timeout-ms: "${TB_HOUSEKEEPER_TASK_PROCESSING_TIMEOUT_MS:120000}" - # Comma-separated list of task types that shouldn't be processed. Available task types: - # DELETE_ATTRIBUTES, DELETE_TELEMETRY (both DELETE_LATEST_TS and DELETE_TS_HISTORY will be disabled), - # DELETE_LATEST_TS, DELETE_TS_HISTORY, DELETE_EVENTS, DELETE_ALARMS, UNASSIGN_ALARMS - disabled-task-types: "${TB_HOUSEKEEPER_DISABLED_TASK_TYPES:}" - # Delay in milliseconds between tasks reprocessing - task-reprocessing-delay-ms: "${TB_HOUSEKEEPER_TASK_REPROCESSING_DELAY_MS:3000}" - # Maximum amount of task reprocessing attempts. After exceeding, the task will be dropped - max-reprocessing-attempts: "${TB_HOUSEKEEPER_MAX_REPROCESSING_ATTEMPTS:10}" - stats: - # Enable/disable statistics for Housekeeper - enabled: "${TB_HOUSEKEEPER_STATS_ENABLED:true}" - # Statistics printing interval for Housekeeper - print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}" - - vc: - # Default topic name for Kafka, RabbitMQ, etc. - topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}" - # Number of partitions to associate with this queue. Used for scaling the number of messages that can be processed in parallel - partitions: "${TB_QUEUE_VC_PARTITIONS:10}" - # Interval in milliseconds between polling of the messages if no new messages arrive - poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}" - # Timeout before retrying all failed and timed-out messages from the processing pack - pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:180000}" - # Timeout for a request to VC-executor (for a request for the version of the entity, for a commit charge, etc.) - request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:180000}" - # Queue settings for Kafka, RabbitMQ, etc. Limit for single message size - msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:250000}" - js: - # JS Eval request topic - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" - # JS Eval responses topic prefix that is combined with node id - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}" - # JS Eval max pending requests - max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" - # JS Eval max request timeout - max_eval_requests_timeout: "${REMOTE_JS_MAX_EVAL_REQUEST_TIMEOUT:60000}" - # JS max request timeout - max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}" - # JS execution max request timeout - max_exec_requests_timeout: "${REMOTE_JS_MAX_EXEC_REQUEST_TIMEOUT:2000}" - # JS response poll interval - response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}" - rule-engine: - # Deprecated. It will be removed in the nearest releases - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}" - # Interval in milliseconds to poll messages by Rule Engine - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" - # Timeout for processing a message pack of Rule Engine - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:2000}" - stats: - # Enable/disable statistics for Rule Engine - enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}" - # Statistics printing interval for Rule Engine - print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}" - # Max length of the error message that is printed by statistics - max-error-message-length: "${TB_QUEUE_RULE_ENGINE_MAX_ERROR_MESSAGE_LENGTH:4096}" - # After a queue is deleted (or the profile's isolation option was disabled), Rule Engine will continue reading related topics during this period before deleting the actual topics - topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}" - # Size of the thread pool that handles such operations as partition changes, config updates, queue deletion - management-thread-pool-size: "${TB_QUEUE_RULE_ENGINE_MGMT_THREAD_POOL_SIZE:12}" - transport: - # For high-priority notifications that require minimum latency and processing time - notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" - # Interval in milliseconds to poll messages - poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}" - integration: - # Name of hash function used for consistent hash ring in Cluster Mode. See architecture docs for more details. Valid values - murmur3_32, murmur3_128 or sha256 - partitions: "${TB_QUEUE_INTEGRATION_PARTITIONS:3}" - # Default notification topic name used by queue - notifications_topic: "${TB_QUEUE_INTEGRATION_NOTIFICATIONS_TOPIC:tb_ie.notifications}" - # Default downlink topic name used by queue - downlink_topic: "${TB_QUEUE_INTEGRATION_DOWNLINK_TOPIC:tb_ie.downlink}" - # Default uplink topic name used by queue - uplink_topic: "${TB_QUEUE_INTEGRATION_UPLINK_TOPIC:tb_ie.uplink}" - # Interval in milliseconds to poll messages by integrations - poll_interval: "${TB_QUEUE_INTEGRATION_POLL_INTERVAL_MS:25}" - # Timeout for processing a message pack by integrations - pack-processing-timeout: "${TB_QUEUE_INTEGRATION_PACK_PROCESSING_TIMEOUT_MS:10000}" - integration_api: - # Default Integration Api request topic name used by queue - requests_topic: "${TB_QUEUE_INTEGRATION_EXECUTOR_API_REQUEST_TOPIC:tb_ie.api.requests}" - # Default Integration Api response topic name used by queue - responses_topic: "${TB_QUEUE_INTEGRATION_EXECUTOR_API_RESPONSE_TOPIC:tb_ie.api.responses}" - # Maximum pending api requests from integration executor to be handled by server< - max_pending_requests: "${TB_QUEUE_INTEGRATION_EXECUTOR_MAX_PENDING_REQUESTS:10000}" - # Maximum timeout in milliseconds to handle api request from integration executor microservice by server - max_requests_timeout: "${TB_QUEUE_INTEGRATION_EXECUTOR_MAX_REQUEST_TIMEOUT:10000}" - # Amount of threads used to invoke callbacks - max_callback_threads: "${TB_QUEUE_INTEGRATION_EXECUTOR_MAX_CALLBACK_THREADS:10}" - # Interval in milliseconds to poll api requests from integration executor microservices - request_poll_interval: "${TB_QUEUE_INTEGRATION_EXECUTOR_REQUEST_POLL_INTERVAL_MS:25}" - # Interval in milliseconds to poll api response from integration executor microservices - response_poll_interval: "${TB_QUEUE_INTEGRATION_EXECUTOR_RESPONSE_POLL_INTERVAL_MS:25}" # General service parameters service: @@ -343,7 +221,8 @@ service: # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" edqs: - label: "${TB_EDQS_LABEL:}" # services with the same label will share the list of partitions + # EDQS instances with the same label will share the same list of partitions + label: "${TB_EDQS_LABEL:}" # Metrics parameters metrics: diff --git a/edqs/src/main/resources/logback.xml b/edqs/src/main/resources/logback.xml index d96cb1a7d3..0aedb9d092 100644 --- a/edqs/src/main/resources/logback.xml +++ b/edqs/src/main/resources/logback.xml @@ -26,6 +26,7 @@ +