EDQS code cleanup

This commit is contained in:
ViacheslavKlimov 2025-02-20 17:34:28 +02:00
parent 92b83335c0
commit a48144d2bb
11 changed files with 48 additions and 256 deletions

View File

@ -125,7 +125,6 @@ public class DefaultEdqsService implements EdqsService {
executor.submit(() -> {
try {
EdqsSyncState syncState = getSyncState();
// FIXME: Slavik smart events check
if (edqsSyncService.isSyncNeeded() || syncState == null || syncState.getStatus() != EdqsSyncStatus.FINISHED) {
if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) {
processSystemRequest(ToCoreEdqsRequest.builder()

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.edqs;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
@ -30,18 +29,16 @@ import java.util.Collections;
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'")
public class KafkaEdqsSyncService extends EdqsSyncService {
private final TbKafkaSettings kafkaSettings;
private TbKafkaAdmin kafkaAdmin;
private final boolean syncNeeded;
@PostConstruct
private void init() {
kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings) {
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
this.syncNeeded = kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic());
}
@Override
public boolean isSyncNeeded() {
return kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic());
return syncNeeded;
}
}

View File

@ -1,224 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.sync.tenant;
import jakarta.annotation.PostConstruct;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.event.Event;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.HasId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.edqs.LatestTsKv;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.dao.TenantEntityDao;
import org.thingsboard.server.dao.attributes.AttributesDao;
import org.thingsboard.server.dao.audit.AuditLogDao;
import org.thingsboard.server.dao.entity.EntityDaoRegistry;
import org.thingsboard.server.dao.event.EventDao;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.relation.RelationDao;
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import static org.thingsboard.server.common.data.ObjectType.ATTRIBUTE_KV;
import static org.thingsboard.server.common.data.ObjectType.AUDIT_LOG;
import static org.thingsboard.server.common.data.ObjectType.EVENT;
import static org.thingsboard.server.common.data.ObjectType.LATEST_TS_KV;
import static org.thingsboard.server.common.data.ObjectType.RELATION;
import static org.thingsboard.server.common.data.ObjectType.TENANT;
@Service
@RequiredArgsConstructor
@Slf4j
public class TenantExportService {
private final EntityDaoRegistry entityDaoRegistry;
private final TenantDao tenantDao;
private final EventDao eventDao;
private final AuditLogDao auditLogDao;
private final AttributesDao attributesDao;
private final RelationDao relationDao;
private final TimeseriesLatestDao timeseriesLatestDao;
private final SqlPartitioningRepository partitioningRepository;
private Map<ObjectType, BiConsumer<TenantId, BiConsumer<ObjectType, Object>>> customExporters;
private Map<ObjectType, Exporter> relatedEntitiesExporters;
private static final Set<ObjectType> RELATED = EnumSet.of(EVENT, RELATION, ATTRIBUTE_KV, LATEST_TS_KV);
@PostConstruct
private void init() {
relatedEntitiesExporters = Map.of(
RELATION, this::exportRelations,
EVENT, this::exportEvents, // todo: query by tenant
ATTRIBUTE_KV, this::exportAttributes,
LATEST_TS_KV, this::exportLatestTelemetry
);
customExporters = Map.of(
AUDIT_LOG, this::exportAuditLogs
);
}
public void exportTenant(TenantId tenantId, ExportConfig config, BiConsumer<ObjectType, Object> processor) {
log.info("[{}] Exporting tenant", tenantId);
Tenant tenant = tenantDao.findById(TenantId.SYS_TENANT_ID, tenantId.getId());
if (tenant == null) {
throw new IllegalArgumentException("Tenant with id " + tenantId + " not found");
}
Set<ObjectType> objectTypes = config.getIncludedObjectTypes();
if (objectTypes.contains(TENANT)) {
exportEntity(tenantId, TENANT, tenant, config, processor);
}
for (ObjectType type : objectTypes) {
if (RELATED.contains(type) || type == TENANT) {
continue;
}
log.debug("[{}] Exporting {} entities", tenantId, type);
if (!customExporters.containsKey(type)) {
TenantEntityDao<?> dao = entityDaoRegistry.getTenantEntityDao(type);
var entities = new PageDataIterable<>(pageLink -> dao.findAllByTenantId(tenantId, pageLink), 100);
for (Object entity : entities) {
exportEntity(tenantId, type, entity, config, processor);
}
} else {
customExporters.get(type).accept(tenantId, processor);
}
}
}
private void exportEntity(TenantId tenantId, ObjectType type, Object entity, ExportConfig config, BiConsumer<ObjectType, Object> processor) {
processor.accept(type, entity);
if (entity instanceof HasId<?> hasId && hasId.getId() instanceof EntityId entityId) {
relatedEntitiesExporters.forEach((relatedEntityType, exporter) -> {
if (config.getIncludedObjectTypes().contains(relatedEntityType)) {
exporter.export(tenantId, entityId, processor);
}
});
}
}
private Map<Long, Long> getPartitions(String table) {
List<Long> partitionsStartTime = partitioningRepository.fetchPartitions(table).stream().sorted().toList();
if (partitionsStartTime.isEmpty()) {
return Collections.emptyMap();
}
Map<Long, Long> partitions = new HashMap<>();
for (int i = 0; i < partitionsStartTime.size(); i++) {
Long startTime = partitionsStartTime.get(i);
Long endTime;
if (partitionsStartTime.size() - 1 == i) {
endTime = System.currentTimeMillis();
} else {
endTime = partitionsStartTime.get(i + 1) - 1;
}
partitions.put(startTime, endTime);
}
return partitions;
}
private void exportAuditLogs(TenantId tenantId, BiConsumer<ObjectType, Object> processor) {
Map<Long, Long> partitions = getPartitions(ModelConstants.AUDIT_LOG_TABLE_NAME);
partitions.forEach((startTime, endTime) -> {
PageDataIterable<AuditLog> auditLogs = new PageDataIterable<>(pageLink -> {
return auditLogDao.findAuditLogsByTenantId(tenantId.getId(), null, new TimePageLink(pageLink, startTime, endTime));
}, 512);
for (AuditLog auditLog : auditLogs) {
processor.accept(AUDIT_LOG, auditLog);
}
});
}
private void exportAttributes(TenantId tenantId, EntityId entityId, BiConsumer<ObjectType, Object> processor) {
for (AttributeScope attributeScope : AttributeScope.values()) {
List<AttributeKvEntry> attributes = attributesDao.findAll(tenantId, entityId, attributeScope);
for (AttributeKvEntry entry : attributes) {
AttributeKv attributeKv = new AttributeKv(entityId, attributeScope, entry, entry.getVersion());
processor.accept(ATTRIBUTE_KV, attributeKv);
}
}
}
private void exportRelations(TenantId tenantId, EntityId entityId, BiConsumer<ObjectType, Object> processor) {
List<EntityRelation> relations = relationDao.findAllByFrom(tenantId, entityId);
for (EntityRelation relation : relations) {
processor.accept(RELATION, relation);
}
}
@SneakyThrows
private void exportLatestTelemetry(TenantId tenantId, EntityId entityId, BiConsumer<ObjectType, Object> processor) {
List<TsKvEntry> latestTelemetry = timeseriesLatestDao.findAllLatest(tenantId, entityId).get(30, TimeUnit.SECONDS);
for (TsKvEntry tsKvEntry : latestTelemetry) {
LatestTsKv latestTsKv = new LatestTsKv(entityId, tsKvEntry, tsKvEntry.getVersion());
processor.accept(LATEST_TS_KV, latestTsKv);
}
}
private void exportEvents(TenantId tenantId, EntityId entityId, BiConsumer<ObjectType, Object> processor) {
for (EventType eventType : EventType.values()) {
Map<Long, Long> partitions = getPartitions(eventType.getTable());
partitions.forEach((startTime, endTime) -> {
PageDataIterable<? extends Event> events = new PageDataIterable<>(pageLink -> {
return eventDao.findEvents(tenantId.getId(), entityId.getId(), eventType, new TimePageLink(pageLink, startTime, endTime));
}, 512);
for (Event event : events) {
processor.accept(EVENT, event);
}
});
}
}
private interface Exporter {
void export(TenantId tenantId, EntityId entityId, BiConsumer<ObjectType, Object> processor);
}
@Data
public static class ExportConfig {
private Set<ObjectType> includedObjectTypes;
}
}

View File

@ -28,7 +28,6 @@
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.kafka.clients" level="WARN"/>
<logger name="org.thingsboard.server.actors" level="WARN"/>
<!-- To enable the logging of scanned rule engine components-->
<!-- <logger name="org.thingsboard.server.service.component.AnnotationComponentDiscoveryService" level="DEBUG" />-->
<!-- To enable the debug logging of rule node upgrade -->

View File

@ -1696,22 +1696,35 @@ queue:
print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}"
edqs:
sync:
enabled: "${TB_EDQS_SYNC_ENABLED:true}" # Enable/disable EDQS synchronization with postgres db FIXME: disable by default before release
entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}" # batch size of entities being synced with EDQS
ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}" # batch size of timeseries data being synced with EDQS
api_enabled: "${TB_EDQS_API_ENABLED:true}" # FIXME: disable by default before release
mode: "${TB_EDQS_MODE:local}" # local or remote
# Enable/disable EDQS synchronization FIXME: disable by default before release
enabled: "${TB_EDQS_SYNC_ENABLED:true}"
# Batch size of entities being synced with EDQS
entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}"
# Batch size of timeseries data being synced with EDQS
ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}"
# Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation) FIXME: disable by default before release
api_enabled: "${TB_EDQS_API_ENABLED:true}"
# Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices)
mode: "${TB_EDQS_MODE:local}"
local:
rocksdb_path: "${TB_EDQS_ROCKSDB_PATH:/tmp/edqs-backup}"
# Path to RocksDB for EDQS backup when running in local mode
rocksdb_path: "${TB_EDQS_ROCKSDB_PATH:${user.home}/.rocksdb/edqs}"
# Number of partitions for EDQS topics
partitions: "${TB_EDQS_PARTITIONS:12}"
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data
# EDQS partitioning strategy: tenant (partition is resolved by tenant id) or none (no specific strategy, resolving by message key)
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}"
# EDQS requests topic
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
# EDQS responses topic
responses_topic: "${TB_EDQS_RESPONSES_TOPIC:edqs.responses}"
# Poll interval for EDQS topics
poll_interval: "${TB_EDQS_POLL_INTERVAL_MS:125}"
# Maximum amount of pending requests to EDQS
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:10000}"
# Maximum timeout for requests to EDQS
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
stats:
# Enable/disable statistics for EDQS service
# Enable/disable statistics for EDQS
enabled: "${TB_EDQS_STATS_ENABLED:true}"
# Statistics printing interval for EDQS
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:60000}"

View File

@ -33,8 +33,8 @@ import static org.awaitility.Awaitility.await;
@DaoSqlTest
@TestPropertySource(properties = {
"queue.type=kafka", // uncomment to use Kafka
"queue.kafka.bootstrap.servers=192.168.0.105:9092",
// "queue.type=kafka", // uncomment to use Kafka
// "queue.kafka.bootstrap.servers=192.168.0.105:9092",
"queue.edqs.sync.enabled=true",
"queue.edqs.api_enabled=true",
"queue.edqs.mode=local"

View File

@ -53,7 +53,7 @@ sql.ttl.audit_logs.ttl=2592000
sql.edge_events.partition_size=168
sql.ttl.edge_events.edge_event_ttl=2592000
server.log_controller_error_stack_trace=true
server.log_controller_error_stack_trace=false
transport.gateway.dashboard.sync.enabled=false

View File

@ -9,7 +9,7 @@
<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
<logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.server" level="WARN"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.springframework.boot.test" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>

View File

@ -33,7 +33,7 @@ public class EdqsRocksDb extends TbRocksDb {
@Getter
private boolean isNew;
public EdqsRocksDb(@Value("${queue.edqs.local.rocksdb_path:${java.io.tmpdir}/edqs-backup}") String path) {
public EdqsRocksDb(@Value("${queue.edqs.local.rocksdb_path:${user.home}/.rocksdb/edqs}") String path) {
super(path, new Options().setCreateIfMissing(true));
}

View File

@ -38,7 +38,7 @@ public class EdqsConfig {
private long pollInterval;
@Value("${queue.edqs.max_pending_requests:10000}")
private int maxPendingRequests;
@Value("${queue.edqs.max_request_timeout:10000}")
@Value("${queue.edqs.max_request_timeout:20000}")
private int maxRequestTimeout;
public String getLabel() {

View File

@ -47,19 +47,27 @@ spring:
queue:
type: "${TB_QUEUE_TYPE:kafka}" # kafka (Apache Kafka)
prefix: "${TB_QUEUE_PREFIX:}" # Global queue prefix. If specified, prefix is added before default topic name: 'prefix.default_topic_name'. Prefix is applied to all topics (and consumer groups for kafka).
in_memory:
stats:
# For debug level
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
edqs:
mode: "${TB_EDQS_MODE:local}"
# Number of partitions for EDQS topics
partitions: "${TB_EDQS_PARTITIONS:12}"
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data
# EDQS partitioning strategy: tenant (partitions are resolved and distributed by tenant id) or none (partitions are resolved by message key; each instance has all the partitions)
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}"
# EDQS requests topic
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
# EDQS responses topic
responses_topic: "${TB_EDQS_RESPONSES_TOPIC:edqs.responses}"
# Poll interval for EDQS topics
poll_interval: "${TB_EDQS_POLL_INTERVAL_MS:125}"
# Maximum amount of pending requests to EDQS
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:10000}"
# Maximum timeout for requests to EDQS
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
stats:
# Enable/disable statistics for EDQS
enabled: "${TB_EDQS_STATS_ENABLED:true}"
# Statistics printing interval for EDQS
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:60000}"
kafka:
# Kafka Bootstrap nodes in "host:port" format
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"