Multiple improvements for EDQS
This commit is contained in:
parent
347cb5bc36
commit
965210f17b
@ -1696,14 +1696,14 @@ queue:
|
||||
print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
edqs:
|
||||
sync:
|
||||
# Enable/disable EDQS synchronization FIXME: disable by default before release
|
||||
enabled: "${TB_EDQS_SYNC_ENABLED:true}"
|
||||
# Enable/disable EDQS synchronization
|
||||
enabled: "${TB_EDQS_SYNC_ENABLED:false}"
|
||||
# Batch size of entities being synced with EDQS
|
||||
entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}"
|
||||
# Batch size of timeseries data being synced with EDQS
|
||||
ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}"
|
||||
# Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation) FIXME: disable by default before release
|
||||
api_enabled: "${TB_EDQS_API_ENABLED:true}"
|
||||
# Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation)
|
||||
api_enabled: "${TB_EDQS_API_ENABLED:false}"
|
||||
# Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices)
|
||||
mode: "${TB_EDQS_MODE:local}"
|
||||
local:
|
||||
@ -1727,7 +1727,7 @@ queue:
|
||||
# Enable/disable statistics for EDQS
|
||||
enabled: "${TB_EDQS_STATS_ENABLED:true}"
|
||||
# Statistics printing interval for EDQS
|
||||
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}"
|
||||
|
||||
vc:
|
||||
# Default topic name
|
||||
|
||||
@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.query.EntityData;
|
||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.edqs.state.EdqsStateService;
|
||||
import org.thingsboard.server.edqs.util.EdqsRocksDb;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -34,7 +35,7 @@ import static org.awaitility.Awaitility.await;
|
||||
@DaoSqlTest
|
||||
@TestPropertySource(properties = {
|
||||
// "queue.type=kafka", // uncomment to use Kafka
|
||||
// "queue.kafka.bootstrap.servers=192.168.0.105:9092",
|
||||
// "queue.kafka.bootstrap.servers=10.7.1.254:9092",
|
||||
"queue.edqs.sync.enabled=true",
|
||||
"queue.edqs.api_enabled=true",
|
||||
"queue.edqs.mode=local"
|
||||
@ -44,12 +45,15 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
|
||||
@Autowired
|
||||
private EdqsService edqsService;
|
||||
|
||||
@Autowired(required = false)
|
||||
private EdqsStateService edqsStateService;
|
||||
|
||||
@MockBean // so that we don't do backup for tests
|
||||
private EdqsRocksDb edqsRocksDb;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled());
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled() && edqsStateService.isReady());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -24,9 +24,7 @@ import jakarta.annotation.PreDestroy;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.ExceptionUtil;
|
||||
@ -90,8 +88,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
private final EdqsConfig config;
|
||||
private final EdqsPartitionService partitionService;
|
||||
private final ConfigurableApplicationContext applicationContext;
|
||||
@Autowired @Lazy
|
||||
private EdqsStateService stateService;
|
||||
private final EdqsStateService stateService;
|
||||
|
||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
|
||||
private TbQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
|
||||
|
||||
@ -105,7 +105,7 @@ public class TenantRepo {
|
||||
|
||||
public void processEvent(EdqsEvent event) {
|
||||
EdqsObject edqsObject = event.getObject();
|
||||
log.debug("[{}] Processing event: {}", tenantId, event);
|
||||
log.trace("[{}] Processing event: {}", tenantId, event);
|
||||
if (event.getEventType() == EdqsEventType.UPDATED) {
|
||||
addOrUpdate(edqsObject);
|
||||
} else if (event.getEventType() == EdqsEventType.DELETED) {
|
||||
|
||||
@ -33,6 +33,8 @@ public interface EdqsStateService {
|
||||
|
||||
void save(TenantId tenantId, ObjectType type, String key, EdqsEventType eventType, ToEdqsMsg msg);
|
||||
|
||||
boolean isReady();
|
||||
|
||||
void stop();
|
||||
|
||||
}
|
||||
|
||||
@ -17,6 +17,8 @@ package org.thingsboard.server.edqs.state;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsEventType;
|
||||
@ -52,8 +54,9 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
private final EdqsConfig config;
|
||||
private final EdqsPartitionService partitionService;
|
||||
private final EdqsQueueFactory queueFactory;
|
||||
private final EdqsProcessor edqsProcessor;
|
||||
private final TopicService topicService;
|
||||
@Autowired @Lazy
|
||||
private EdqsProcessor edqsProcessor;
|
||||
|
||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> stateConsumer;
|
||||
private QueueStateService<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>> queueStateService;
|
||||
@ -63,6 +66,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
private final VersionsStore versionsStore = new VersionsStore();
|
||||
private final AtomicInteger stateReadCount = new AtomicInteger();
|
||||
private final AtomicInteger eventsReadCount = new AtomicInteger();
|
||||
private Boolean ready;
|
||||
|
||||
@Override
|
||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer) {
|
||||
@ -72,9 +76,6 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
.pollInterval(config.getPollInterval())
|
||||
.msgPackProcessor((msgs, consumer, config) -> {
|
||||
for (TbProtoQueueMsg<ToEdqsMsg> queueMsg : msgs) {
|
||||
if (consumer.isStopped()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
ToEdqsMsg msg = queueMsg.getValue();
|
||||
edqsProcessor.process(msg, EdqsQueue.STATE);
|
||||
@ -124,7 +125,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
TenantId tenantId = getTenantId(msg);
|
||||
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
|
||||
EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType());
|
||||
log.debug("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key);
|
||||
log.trace("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key);
|
||||
stateProducer.send(tenantId, objectType, key, msg);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
@ -160,6 +161,18 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
// do nothing here, backup is done by events consumer
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReady() {
|
||||
if (ready == null) {
|
||||
Set<TopicPartitionInfo> partitionsInProgress = queueStateService.getPartitionsInProgress();
|
||||
if (partitionsInProgress != null && partitionsInProgress.isEmpty()) {
|
||||
ready = true; // once true - always true, not to change readiness status on each repartitioning
|
||||
}
|
||||
}
|
||||
log.error("ready: {}", ready);
|
||||
return ready != null && ready;
|
||||
}
|
||||
|
||||
private TenantId getTenantId(ToEdqsMsg edqsMsg) {
|
||||
return TenantId.fromUUID(new UUID(edqsMsg.getTenantIdMSB(), edqsMsg.getTenantIdLSB()));
|
||||
}
|
||||
|
||||
@ -80,6 +80,11 @@ public class LocalEdqsStateService implements EdqsStateService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReady() {
|
||||
return partitions != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
||||
@ -43,9 +43,12 @@ public class EdqsStatsService {
|
||||
private final ConcurrentHashMap<TenantId, EdqsStats> statsMap = new ConcurrentHashMap<>();
|
||||
private final StatsFactory statsFactory;
|
||||
|
||||
@Scheduled(initialDelayString = "${queue.edqs.stats.print-interval-ms:60000}",
|
||||
fixedDelayString = "${queue.edqs.stats.print-interval-ms:60000}")
|
||||
@Scheduled(initialDelayString = "${queue.edqs.stats.print-interval-ms:300000}",
|
||||
fixedDelayString = "${queue.edqs.stats.print-interval-ms:300000}")
|
||||
private void reportStats() {
|
||||
if (statsMap.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String values = statsMap.entrySet().stream()
|
||||
.map(kv -> "TenantId [" + kv.getKey() + "] stats [" + kv.getValue() + "]")
|
||||
.collect(Collectors.joining(System.lineSeparator()));
|
||||
|
||||
@ -17,13 +17,11 @@ package org.thingsboard.server.common.msg.queue;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
@ToString
|
||||
public class TopicPartitionInfo {
|
||||
|
||||
private final String topic;
|
||||
@ -97,4 +95,13 @@ public class TopicPartitionInfo {
|
||||
return Objects.hash(fullTopicName, partition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String str = fullTopicName;
|
||||
if (useInternalPartition) {
|
||||
str += "[" + partition + "]";
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
|
||||
@ -270,12 +269,6 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
|
||||
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
|
||||
}
|
||||
|
||||
static String partitionsToString(Collection<TopicPartitionInfo> partitions) {
|
||||
return partitions.stream().map(tpi -> tpi.getFullTopicName() + (tpi.isUseInternalPartition() ?
|
||||
"[" + tpi.getPartition().orElse(-1) + "]" : ""))
|
||||
.collect(Collectors.joining(", ", "[", "]"));
|
||||
}
|
||||
|
||||
public interface MsgPackProcessor<M extends TbQueueMsg, C extends QueueConfig> {
|
||||
void process(List<M> msgs, TbQueueConsumer<M> consumer, C config) throws Exception;
|
||||
}
|
||||
@ -299,7 +292,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
|
||||
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
|
||||
removedPartitions.removeAll(partitions);
|
||||
|
||||
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions));
|
||||
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, addedPartitions, removedPartitions);
|
||||
removePartitions(removedPartitions);
|
||||
addPartitions(addedPartitions, null);
|
||||
}
|
||||
@ -333,7 +326,7 @@ public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfi
|
||||
|
||||
@Override
|
||||
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
|
||||
log.info("[{}] New partitions: {}", queueKey, partitionsToString(partitions));
|
||||
log.info("[{}] New partitions: {}", queueKey, partitions);
|
||||
if (partitions.isEmpty()) {
|
||||
if (consumer != null && consumer.isRunning()) {
|
||||
consumer.initiateStop();
|
||||
|
||||
@ -52,10 +52,10 @@ public class PartitionedQueueConsumerManager<M extends TbQueueMsg> extends MainQ
|
||||
@Override
|
||||
protected void processTask(TbQueueConsumerManagerTask task) {
|
||||
if (task instanceof AddPartitionsTask addPartitionsTask) {
|
||||
log.info("[{}] Added partitions: {}", queueKey, partitionsToString(addPartitionsTask.partitions()));
|
||||
log.info("[{}] Added partitions: {}", queueKey, addPartitionsTask.partitions());
|
||||
consumerWrapper.addPartitions(addPartitionsTask.partitions(), addPartitionsTask.onStop());
|
||||
} else if (task instanceof RemovePartitionsTask removePartitionsTask) {
|
||||
log.info("[{}] Removed partitions: {}", queueKey, partitionsToString(removePartitionsTask.partitions()));
|
||||
log.info("[{}] Removed partitions: {}", queueKey, removePartitionsTask.partitions());
|
||||
consumerWrapper.removePartitions(removePartitionsTask.partitions());
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,16 +16,19 @@
|
||||
package org.thingsboard.server.queue.common.consumer;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.TbQueueMsg;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
|
||||
private PartitionedQueueConsumerManager<S> stateConsumer;
|
||||
@ -33,6 +36,9 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
|
||||
@Getter
|
||||
private Set<TopicPartitionInfo> partitions;
|
||||
private final Set<TopicPartitionInfo> partitionsInProgress = ConcurrentHashMap.newKeySet();
|
||||
private boolean initialized;
|
||||
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
||||
public void init(PartitionedQueueConsumerManager<S> stateConsumer, PartitionedQueueConsumerManager<E> eventConsumer) {
|
||||
@ -62,9 +68,15 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
}
|
||||
|
||||
if (!addedPartitions.isEmpty()) {
|
||||
partitionsInProgress.addAll(addedPartitions);
|
||||
stateConsumer.addPartitions(addedPartitions, partition -> {
|
||||
lock.lock();
|
||||
try {
|
||||
partitionsInProgress.remove(partition);
|
||||
log.info("Finished partition {} (still in progress: {})", partition, partitionsInProgress);
|
||||
if (partitionsInProgress.isEmpty()) {
|
||||
log.info("All partitions processed");
|
||||
}
|
||||
if (this.partitions.contains(partition)) {
|
||||
eventConsumer.addPartitions(Set.of(partition.withTopic(eventConsumer.getTopic())));
|
||||
}
|
||||
@ -73,10 +85,15 @@ public class QueueStateService<E extends TbQueueMsg, S extends TbQueueMsg> {
|
||||
}
|
||||
});
|
||||
}
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
private Set<TopicPartitionInfo> withTopic(Set<TopicPartitionInfo> partitions, String topic) {
|
||||
return partitions.stream().map(tpi -> tpi.withTopic(topic)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public Set<TopicPartitionInfo> getPartitionsInProgress() {
|
||||
return initialized ? partitionsInProgress : null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
|
||||
@Slf4j
|
||||
public class HashPartitionService implements PartitionService {
|
||||
|
||||
@Value("${queue.core.topic}")
|
||||
@Value("${queue.core.topic:tb_core}")
|
||||
private String coreTopic;
|
||||
@Value("${queue.core.partitions:10}")
|
||||
private Integer corePartitions;
|
||||
|
||||
@ -21,7 +21,6 @@ import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
// TODO: tb-core ?
|
||||
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " +
|
||||
"(('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && " +
|
||||
"'${queue.edqs.mode:null}'=='local'))")
|
||||
|
||||
@ -54,6 +54,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
|
||||
|
||||
private final boolean readFromBeginning; // reset offset to beginning
|
||||
private final boolean stopWhenRead; // stop consuming when reached an empty msg pack
|
||||
private int readCount;
|
||||
private Map<Integer, Long> endOffsets; // needed if stopWhenRead is true
|
||||
|
||||
private boolean partitionsAssigned = false;
|
||||
@ -152,6 +153,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
|
||||
records.forEach(record -> {
|
||||
recordList.add(record);
|
||||
if (stopWhenRead) {
|
||||
readCount++;
|
||||
int partition = record.partition();
|
||||
Long endOffset = endOffsets.get(partition);
|
||||
if (endOffset == null) {
|
||||
@ -166,7 +168,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
|
||||
});
|
||||
}
|
||||
if (stopWhenRead && endOffsets.isEmpty()) {
|
||||
log.info("Reached end offset for {}, stopping consumer", consumer.assignment());
|
||||
log.info("Finished reading {}, processed {} messages", partitions, readCount);
|
||||
stop();
|
||||
}
|
||||
return recordList;
|
||||
|
||||
@ -38,7 +38,7 @@ public class DefaultStatsFactory implements StatsFactory {
|
||||
|
||||
private static final Counter STUB_COUNTER = new StubCounter();
|
||||
|
||||
@Autowired(required = false) // FIXME Slavik !!!
|
||||
@Autowired // FIXME Slavik !!!
|
||||
private MeterRegistry meterRegistry;
|
||||
|
||||
@Value("${metrics.enabled:false}")
|
||||
|
||||
@ -142,12 +142,12 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
||||
private EdqsResponse processEdqsRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
|
||||
EdqsResponse response;
|
||||
try {
|
||||
log.info("Sending request to EDQS: {}", request);
|
||||
log.debug("[{}] Sending request to EDQS: {}", tenantId, request);
|
||||
response = edqsService.processRequest(tenantId, customerId, request).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
log.info("Received response from EDQS: {}", response);
|
||||
log.debug("[{}] Received response from EDQS: {}", tenantId, response);
|
||||
if (response.getError() != null) {
|
||||
throw new RuntimeException(response.getError());
|
||||
}
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
package org.thingsboard.server.dao.sql.query;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
|
||||
@Service
|
||||
@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "false", matchIfMissing = true)
|
||||
@ConditionalOnMissingBean(value = EdqsService.class, ignored = DummyEdqsService.class)
|
||||
public class DummyEdqsService implements EdqsService {
|
||||
|
||||
@Override
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.edqs;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.thingsboard.server.edqs.state.EdqsStateService;
|
||||
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
@RequestMapping("/api/edqs")
|
||||
public class EdqsController {
|
||||
|
||||
private final EdqsStateService edqsStateService;
|
||||
|
||||
@GetMapping("/ready")
|
||||
public ResponseEntity<Void> isReady() {
|
||||
if (edqsStateService.isReady()) {
|
||||
return ResponseEntity.ok().build();
|
||||
} else {
|
||||
return ResponseEntity.badRequest().build();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.edqs;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.SpringBootConfiguration;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
@ -27,6 +28,7 @@ import java.util.Arrays;
|
||||
@SpringBootConfiguration
|
||||
@EnableAsync
|
||||
@EnableScheduling
|
||||
@EnableAutoConfiguration
|
||||
@ComponentScan({"org.thingsboard.server.edqs", "org.thingsboard.server.queue.edqs", "org.thingsboard.server.queue.discovery", "org.thingsboard.server.queue.kafka",
|
||||
"org.thingsboard.server.queue.settings", "org.thingsboard.server.queue.environment", "org.thingsboard.server.common.stats"})
|
||||
@Slf4j
|
||||
@ -39,71 +41,6 @@ public class ThingsboardEdqsApplication {
|
||||
SpringApplication.run(ThingsboardEdqsApplication.class, updateArguments(args));
|
||||
}
|
||||
|
||||
// @Bean
|
||||
// public ApplicationRunner runner(CSVLoader loader, EdqRepository edqRepository) {
|
||||
// return args -> {
|
||||
// long startTs = System.currentTimeMillis();
|
||||
// var loader = new TenantRepoLoader(new TenantRepo(TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60"))));
|
||||
// loader.load();
|
||||
// log.info("Loaded all in {} ms", System.currentTimeMillis() - startTs);
|
||||
|
||||
|
||||
|
||||
// log.info("Compressed {} strings/json, Before: {}, After: {}",
|
||||
// CompressedStringDataPoint.cnt.get(),
|
||||
// CompressedStringDataPoint.uncompressedLength.get(),
|
||||
// CompressedStringDataPoint.compressedLength.get());
|
||||
//
|
||||
// log.info("Deduplicated {} short and {} long strings",
|
||||
// TbStringPool.size(), TbBytePool.size());
|
||||
//
|
||||
// var tenantId = TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60"));
|
||||
// var customerId = new CustomerId(UUID.fromString("fcbf2f50-d0d9-11ea-bea3-177755191a6e"));
|
||||
// System.gc();
|
||||
//
|
||||
// while (true) {
|
||||
// EntityTypeFilter filter = new EntityTypeFilter();
|
||||
// filter.setEntityType(EntityType.DEVICE);
|
||||
// var pageLink = new EntityDataPageLink(20, 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.TIME_SERIES, "state"), EntityDataSortOrder.Direction.DESC), false);
|
||||
//
|
||||
// var entityFields = Arrays.asList(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime"));
|
||||
// var latestValues = Arrays.asList(new EntityKey(EntityKeyType.TIME_SERIES, "state"));
|
||||
// KeyFilter nameFilter = new KeyFilter();
|
||||
// nameFilter.setKey(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"));
|
||||
// var predicate = new StringFilterPredicate();
|
||||
// predicate.setIgnoreCase(false);
|
||||
// predicate.setOperation(StringFilterPredicate.StringOperation.CONTAINS);
|
||||
// predicate.setValue(new FilterPredicateValue<>("LoRa-"));
|
||||
// nameFilter.setPredicate(predicate);
|
||||
// nameFilter.setValueType(EntityKeyValueType.STRING);
|
||||
//
|
||||
// EntityDataQuery edq = new EntityDataQuery(filter, pageLink, entityFields, latestValues, Arrays.asList(nameFilter));
|
||||
// var result = edqRepository.findEntityDataByQuery(tenantId, customerId, RepositoryUtils.ALL_READ_PERMISSIONS, edq, false);
|
||||
// log.info("Device count: {}", result.getTotalElements());
|
||||
// log.info("First: {}", result.getData().get(0).getEntityId());
|
||||
// log.info("Last: {}", result.getData().get(19).getEntityId());
|
||||
//
|
||||
// pageLink.setSortOrder(new EntityDataSortOrder(new EntityKey(EntityKeyType.TIME_SERIES, "state"), EntityDataSortOrder.Direction.ASC));
|
||||
// result = edqRepository.findEntityDataByQuery(tenantId, customerId, RepositoryUtils.ALL_READ_PERMISSIONS, edq, false);
|
||||
// log.info("Device count: {}", result.getTotalElements());
|
||||
// log.info("First: {}", result.getData().get(0).getEntityId());
|
||||
// log.info("Last: {}", result.getData().get(19).getEntityId());
|
||||
//
|
||||
// result.getData().forEach(data -> {
|
||||
// System.err.println(data.getEntityId() + ":");
|
||||
// data.getLatest().forEach((type, values) -> {
|
||||
// System.err.println(type);
|
||||
// values.forEach((key, tsValue) -> {
|
||||
// System.err.println(key + " = " + tsValue.getValue());
|
||||
// });
|
||||
// });
|
||||
// System.err.println();
|
||||
// });
|
||||
// Thread.sleep(5000);
|
||||
// }
|
||||
// };
|
||||
// }
|
||||
|
||||
private static String[] updateArguments(String[] args) {
|
||||
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
|
||||
String[] modifiedArgs = new String[args.length + 1];
|
||||
|
||||
@ -14,6 +14,12 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
server:
|
||||
# Server bind-address
|
||||
address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
|
||||
# Server bind port
|
||||
port: "${HTTP_BIND_PORT:8080}"
|
||||
|
||||
# Application info parameters
|
||||
app:
|
||||
# Application version
|
||||
@ -38,10 +44,9 @@ zk:
|
||||
# The delay is recommended because the initialization of rule chain actors is time-consuming. Avoiding unnecessary recalculations during a restart can enhance system performance and stability.
|
||||
recalculate_delay: "${ZOOKEEPER_RECALCULATE_DELAY_MS:0}"
|
||||
|
||||
# SQL DAO Configuration parameters
|
||||
spring:
|
||||
main:
|
||||
web-application-type: "none"
|
||||
allow-circular-references: "true" # Spring Boot configuration property that controls whether circular dependencies between beans are allowed.
|
||||
|
||||
# Queue configuration parameters
|
||||
queue:
|
||||
@ -66,7 +71,7 @@ queue:
|
||||
# Enable/disable statistics for EDQS
|
||||
enabled: "${TB_EDQS_STATS_ENABLED:true}"
|
||||
# Statistics printing interval for EDQS
|
||||
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}"
|
||||
|
||||
kafka:
|
||||
# Kafka Bootstrap nodes in "host:port" format
|
||||
@ -137,9 +142,9 @@ queue:
|
||||
- key: max.poll.interval.ms
|
||||
# Example of specific consumer properties value per topic for VC
|
||||
value: "${TB_QUEUE_KAFKA_VC_MAX_POLL_INTERVAL_MS:600000}"
|
||||
# tb_rule_engine.sq:
|
||||
# - key: max.poll.records
|
||||
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
|
||||
# tb_rule_engine.sq:
|
||||
# - key: max.poll.records
|
||||
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
|
||||
tb_housekeeper:
|
||||
# Consumer properties for Housekeeper tasks topic
|
||||
- key: max.poll.records
|
||||
@ -209,133 +214,6 @@ queue:
|
||||
request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
|
||||
# Interval in milliseconds to poll api response from transport microservices
|
||||
response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
|
||||
core:
|
||||
# Default topic name of Kafka, RabbitMQ, etc. queue
|
||||
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
|
||||
# Interval in milliseconds to poll messages by Core microservices
|
||||
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
|
||||
# Amount of partitions used by Core microservices
|
||||
partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
|
||||
# Timeout for processing a message pack by Core microservices
|
||||
pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}"
|
||||
# Enable/disable a separate consumer per partition for Core queue
|
||||
consumer-per-partition: "${TB_QUEUE_CORE_CONSUMER_PER_PARTITION:true}"
|
||||
ota:
|
||||
# Default topic name for OTA updates
|
||||
topic: "${TB_QUEUE_CORE_OTA_TOPIC:tb_ota_package}"
|
||||
# The interval of processing the OTA updates for devices. Used to avoid any harm to the network due to many parallel OTA updates
|
||||
pack-interval-ms: "${TB_QUEUE_CORE_OTA_PACK_INTERVAL_MS:60000}"
|
||||
# The size of OTA updates notifications fetched from the queue. The queue stores pairs of firmware and device ids
|
||||
pack-size: "${TB_QUEUE_CORE_OTA_PACK_SIZE:100}"
|
||||
# Stats topic name for queue Kafka, RabbitMQ, etc.
|
||||
usage-stats-topic: "${TB_QUEUE_US_TOPIC:tb_usage_stats}"
|
||||
stats:
|
||||
# Enable/disable statistics for Core microservices
|
||||
enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}"
|
||||
# Statistics printing interval for Core microservices
|
||||
print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
housekeeper:
|
||||
# Topic name for Housekeeper tasks
|
||||
topic: "${TB_HOUSEKEEPER_TOPIC:tb_housekeeper}"
|
||||
# Topic name for Housekeeper tasks to be reprocessed
|
||||
reprocessing-topic: "${TB_HOUSEKEEPER_REPROCESSING_TOPIC:tb_housekeeper.reprocessing}"
|
||||
# Poll interval for topics related to Housekeeper
|
||||
poll-interval-ms: "${TB_HOUSEKEEPER_POLL_INTERVAL_MS:500}"
|
||||
# Timeout in milliseconds for task processing. Tasks that fail to finish on time will be submitted for reprocessing
|
||||
task-processing-timeout-ms: "${TB_HOUSEKEEPER_TASK_PROCESSING_TIMEOUT_MS:120000}"
|
||||
# Comma-separated list of task types that shouldn't be processed. Available task types:
|
||||
# DELETE_ATTRIBUTES, DELETE_TELEMETRY (both DELETE_LATEST_TS and DELETE_TS_HISTORY will be disabled),
|
||||
# DELETE_LATEST_TS, DELETE_TS_HISTORY, DELETE_EVENTS, DELETE_ALARMS, UNASSIGN_ALARMS
|
||||
disabled-task-types: "${TB_HOUSEKEEPER_DISABLED_TASK_TYPES:}"
|
||||
# Delay in milliseconds between tasks reprocessing
|
||||
task-reprocessing-delay-ms: "${TB_HOUSEKEEPER_TASK_REPROCESSING_DELAY_MS:3000}"
|
||||
# Maximum amount of task reprocessing attempts. After exceeding, the task will be dropped
|
||||
max-reprocessing-attempts: "${TB_HOUSEKEEPER_MAX_REPROCESSING_ATTEMPTS:10}"
|
||||
stats:
|
||||
# Enable/disable statistics for Housekeeper
|
||||
enabled: "${TB_HOUSEKEEPER_STATS_ENABLED:true}"
|
||||
# Statistics printing interval for Housekeeper
|
||||
print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
|
||||
vc:
|
||||
# Default topic name for Kafka, RabbitMQ, etc.
|
||||
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
|
||||
# Number of partitions to associate with this queue. Used for scaling the number of messages that can be processed in parallel
|
||||
partitions: "${TB_QUEUE_VC_PARTITIONS:10}"
|
||||
# Interval in milliseconds between polling of the messages if no new messages arrive
|
||||
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
|
||||
# Timeout before retrying all failed and timed-out messages from the processing pack
|
||||
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:180000}"
|
||||
# Timeout for a request to VC-executor (for a request for the version of the entity, for a commit charge, etc.)
|
||||
request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:180000}"
|
||||
# Queue settings for Kafka, RabbitMQ, etc. Limit for single message size
|
||||
msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:250000}"
|
||||
js:
|
||||
# JS Eval request topic
|
||||
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
|
||||
# JS Eval responses topic prefix that is combined with node id
|
||||
response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}"
|
||||
# JS Eval max pending requests
|
||||
max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
|
||||
# JS Eval max request timeout
|
||||
max_eval_requests_timeout: "${REMOTE_JS_MAX_EVAL_REQUEST_TIMEOUT:60000}"
|
||||
# JS max request timeout
|
||||
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
|
||||
# JS execution max request timeout
|
||||
max_exec_requests_timeout: "${REMOTE_JS_MAX_EXEC_REQUEST_TIMEOUT:2000}"
|
||||
# JS response poll interval
|
||||
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
|
||||
rule-engine:
|
||||
# Deprecated. It will be removed in the nearest releases
|
||||
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb_rule_engine}"
|
||||
# Interval in milliseconds to poll messages by Rule Engine
|
||||
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
|
||||
# Timeout for processing a message pack of Rule Engine
|
||||
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:2000}"
|
||||
stats:
|
||||
# Enable/disable statistics for Rule Engine
|
||||
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
|
||||
# Statistics printing interval for Rule Engine
|
||||
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
# Max length of the error message that is printed by statistics
|
||||
max-error-message-length: "${TB_QUEUE_RULE_ENGINE_MAX_ERROR_MESSAGE_LENGTH:4096}"
|
||||
# After a queue is deleted (or the profile's isolation option was disabled), Rule Engine will continue reading related topics during this period before deleting the actual topics
|
||||
topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}"
|
||||
# Size of the thread pool that handles such operations as partition changes, config updates, queue deletion
|
||||
management-thread-pool-size: "${TB_QUEUE_RULE_ENGINE_MGMT_THREAD_POOL_SIZE:12}"
|
||||
transport:
|
||||
# For high-priority notifications that require minimum latency and processing time
|
||||
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
|
||||
# Interval in milliseconds to poll messages
|
||||
poll_interval: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
|
||||
integration:
|
||||
# Name of hash function used for consistent hash ring in Cluster Mode. See architecture docs for more details. Valid values - murmur3_32, murmur3_128 or sha256
|
||||
partitions: "${TB_QUEUE_INTEGRATION_PARTITIONS:3}"
|
||||
# Default notification topic name used by queue
|
||||
notifications_topic: "${TB_QUEUE_INTEGRATION_NOTIFICATIONS_TOPIC:tb_ie.notifications}"
|
||||
# Default downlink topic name used by queue
|
||||
downlink_topic: "${TB_QUEUE_INTEGRATION_DOWNLINK_TOPIC:tb_ie.downlink}"
|
||||
# Default uplink topic name used by queue
|
||||
uplink_topic: "${TB_QUEUE_INTEGRATION_UPLINK_TOPIC:tb_ie.uplink}"
|
||||
# Interval in milliseconds to poll messages by integrations
|
||||
poll_interval: "${TB_QUEUE_INTEGRATION_POLL_INTERVAL_MS:25}"
|
||||
# Timeout for processing a message pack by integrations
|
||||
pack-processing-timeout: "${TB_QUEUE_INTEGRATION_PACK_PROCESSING_TIMEOUT_MS:10000}"
|
||||
integration_api:
|
||||
# Default Integration Api request topic name used by queue
|
||||
requests_topic: "${TB_QUEUE_INTEGRATION_EXECUTOR_API_REQUEST_TOPIC:tb_ie.api.requests}"
|
||||
# Default Integration Api response topic name used by queue
|
||||
responses_topic: "${TB_QUEUE_INTEGRATION_EXECUTOR_API_RESPONSE_TOPIC:tb_ie.api.responses}"
|
||||
# Maximum pending api requests from integration executor to be handled by server<
|
||||
max_pending_requests: "${TB_QUEUE_INTEGRATION_EXECUTOR_MAX_PENDING_REQUESTS:10000}"
|
||||
# Maximum timeout in milliseconds to handle api request from integration executor microservice by server
|
||||
max_requests_timeout: "${TB_QUEUE_INTEGRATION_EXECUTOR_MAX_REQUEST_TIMEOUT:10000}"
|
||||
# Amount of threads used to invoke callbacks
|
||||
max_callback_threads: "${TB_QUEUE_INTEGRATION_EXECUTOR_MAX_CALLBACK_THREADS:10}"
|
||||
# Interval in milliseconds to poll api requests from integration executor microservices
|
||||
request_poll_interval: "${TB_QUEUE_INTEGRATION_EXECUTOR_REQUEST_POLL_INTERVAL_MS:25}"
|
||||
# Interval in milliseconds to poll api response from integration executor microservices
|
||||
response_poll_interval: "${TB_QUEUE_INTEGRATION_EXECUTOR_RESPONSE_POLL_INTERVAL_MS:25}"
|
||||
|
||||
# General service parameters
|
||||
service:
|
||||
@ -343,7 +221,8 @@ service:
|
||||
# Unique id for this service (autogenerated if empty)
|
||||
id: "${TB_SERVICE_ID:}"
|
||||
edqs:
|
||||
label: "${TB_EDQS_LABEL:}" # services with the same label will share the list of partitions
|
||||
# EDQS instances with the same label will share the same list of partitions
|
||||
label: "${TB_EDQS_LABEL:}"
|
||||
|
||||
# Metrics parameters
|
||||
metrics:
|
||||
|
||||
@ -26,6 +26,7 @@
|
||||
</appender>
|
||||
|
||||
<logger name="org.thingsboard.server" level="INFO"/>
|
||||
<logger name="org.thingsboard.server.edqs" level="DEBUG"/>
|
||||
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
|
||||
<logger name="org.apache.kafka.clients" level="WARN"/>
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user