configurable edqs sync batch sizes

This commit is contained in:
dashevchenko 2025-02-05 12:20:23 +02:00
parent 60736e1237
commit b2bf5b9f65
13 changed files with 24 additions and 19 deletions

View File

@ -81,7 +81,7 @@ import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
@Slf4j
@ConditionalOnProperty(value = "queue.edqs.sync_enabled", havingValue = "true")
@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "true")
public class DefaultEdqsService implements EdqsService {
private final EdqsClientQueueFactory queueFactory;

View File

@ -28,7 +28,7 @@ import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
@Service
@RequiredArgsConstructor
@ConditionalOnProperty(value = "queue.edqs.sync_enabled", havingValue = "true")
@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "true")
public class EdqsListener {
private final EdqsService edqsService;

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edqs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
@ -42,7 +43,6 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.sql.relation.RelationRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.dao.tenant.TenantDao;
import java.util.List;
import java.util.Map;
@ -58,11 +58,13 @@ import static org.thingsboard.server.common.data.ObjectType.edqsTenantTypes;
@Slf4j
public abstract class EdqsSyncService {
@Value("${queue.edqs.sync.entity_batch_size:10000}")
private int entityBatchSize;
@Value("${queue.edqs.sync.ts_batch_size:10000}")
private int tsBatchSize;
@Autowired
private EntityDaoRegistry entityDaoRegistry;
@Autowired
private TenantDao tenantDao;
@Autowired
private AttributesDao attributesDao;
@Autowired
private KeyDictionaryDao keyDictionaryDao;
@ -112,7 +114,7 @@ public abstract class EdqsSyncService {
Dao<?> dao = entityDaoRegistry.getDao(entityType);
UUID lastId = UUID.fromString("00000000-0000-0000-0000-000000000000");
while (true) {
var batch = dao.findNextBatch(lastId, 10000);
var batch = dao.findNextBatch(lastId, entityBatchSize);
if (batch.isEmpty()) {
break;
}
@ -140,7 +142,7 @@ public abstract class EdqsSyncService {
while (true) {
List<RelationEntity> batch = relationRepository.findNextBatch(lastFromEntityId, lastFromEntityType, lastRelationTypeGroup,
lastRelationType, lastToEntityId, lastToEntityType, 10000);
lastRelationType, lastToEntityId, lastToEntityType, entityBatchSize);
if (batch.isEmpty()) {
break;
}
@ -189,7 +191,7 @@ public abstract class EdqsSyncService {
int lastAttributeKey = Integer.MIN_VALUE;
while (true) {
List<AttributeKvEntity> batch = attributesDao.findNextBatch(lastEntityId, lastAttributeType, lastAttributeKey, 10000);
List<AttributeKvEntity> batch = attributesDao.findNextBatch(lastEntityId, lastAttributeType, lastAttributeKey, tsBatchSize);
if (batch.isEmpty()) {
break;
}
@ -228,7 +230,7 @@ public abstract class EdqsSyncService {
int lastKey = Integer.MIN_VALUE;
while (true) {
List<TsKvLatestEntity> batch = tsKvLatestRepository.findNextBatch(lastEntityId, lastKey, 10000);
List<TsKvLatestEntity> batch = tsKvLatestRepository.findNextBatch(lastEntityId, lastKey, tsBatchSize);
if (batch.isEmpty()) {
break;
}

View File

@ -27,7 +27,7 @@ import java.util.Collections;
@Service
@RequiredArgsConstructor
@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'")
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'")
public class KafkaEdqsSyncService extends EdqsSyncService {
private final TbKafkaSettings kafkaSettings;

View File

@ -22,7 +22,7 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb;
@Service
@RequiredArgsConstructor
@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}' == 'true' && '${queue.type:null}' == 'in-memory'")
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'in-memory'")
public class LocalEdqsSyncService extends EdqsSyncService {
private final EdqsRocksDb db;

View File

@ -1695,7 +1695,10 @@ queue:
# Statistics printing interval for Housekeeper
print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}"
edqs:
sync_enabled: "${TB_EDQS_SYNC_ENABLED:true}" # FIXME: disable by default before release
sync:
enabled: "${TB_EDQS_SYNC_ENABLED:true}" # Enable/disable EDQS synchronization with postgres db FIXME: disable by default before release
entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}" # batch size of entities being synced with EDQS
ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}" # batch size of timeseries data being synced with EDQS
api_enabled: "${TB_EDQS_API_ENABLED:true}" # FIXME: disable by default before release
mode: "${TB_EDQS_MODE:local}" # local or remote
local:

View File

@ -36,7 +36,7 @@ import static org.awaitility.Awaitility.await;
@TestPropertySource(properties = {
// "queue.type=kafka", // uncomment to use Kafka
// "queue.kafka.bootstrap.servers=10.7.1.254:9092",
"queue.edqs.sync_enabled=true",
"queue.edqs.sync.enabled=true",
"queue.edqs.api_enabled=true",
"queue.edqs.mode=local"
})

View File

@ -34,7 +34,7 @@ import static org.awaitility.Awaitility.await;
@DaoSqlTest
@TestPropertySource(properties = {
"queue.edqs.sync_enabled=true",
"queue.edqs.sync.enabled=true",
"queue.edqs.api_enabled=true",
"queue.edqs.mode=local"
})

View File

@ -57,5 +57,5 @@ server.log_controller_error_stack_trace=true
transport.gateway.dashboard.sync.enabled=false
queue.edqs.sync_enabled=false
queue.edqs.sync.enabled=false
queue.edqs.api_enabled=false

View File

@ -22,7 +22,7 @@ import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
// TODO: tb-core ?
@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " +
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " +
"(('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && " +
"'${queue.edqs.mode:null}'=='local'))")
public @interface EdqsComponent {

View File

@ -21,6 +21,6 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}'=='true' && '${service.type:null}'=='monolith' && '${queue.edqs.mode:null}'=='local' && '${queue.type:null}'=='in-memory'")
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && '${service.type:null}'=='monolith' && '${queue.edqs.mode:null}'=='local' && '${queue.type:null}'=='in-memory'")
public @interface InMemoryEdqsComponent {
}

View File

@ -21,7 +21,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " +
@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " +
"(('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && " +
"'${queue.edqs.mode:null}'=='local' && '${queue.type:null}'=='kafka'))")
public @interface KafkaEdqsComponent {

View File

@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.edqs.EdqsService;
@Service
@ConditionalOnProperty(value = "queue.edqs.sync_enabled", havingValue = "false", matchIfMissing = true)
@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "false", matchIfMissing = true)
public class DummyEdqsService implements EdqsService {
@Override