diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index 160d4bb565..e7243ed3bc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -81,7 +81,7 @@ import java.util.concurrent.TimeUnit; @Service @RequiredArgsConstructor @Slf4j -@ConditionalOnProperty(value = "queue.edqs.sync_enabled", havingValue = "true") +@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "true") public class DefaultEdqsService implements EdqsService { private final EdqsClientQueueFactory queueFactory; diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsListener.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsListener.java index c4ce0c7a7e..dc8d4bf36d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsListener.java @@ -28,7 +28,7 @@ import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; @Service @RequiredArgsConstructor -@ConditionalOnProperty(value = "queue.edqs.sync_enabled", havingValue = "true") +@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "true") public class EdqsListener { private final EdqsService edqsService; diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index c2750ae840..80aeee8635 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.edqs; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; @@ -42,7 +43,6 @@ import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sql.relation.RelationRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; -import org.thingsboard.server.dao.tenant.TenantDao; import java.util.List; import java.util.Map; @@ -58,11 +58,13 @@ import static org.thingsboard.server.common.data.ObjectType.edqsTenantTypes; @Slf4j public abstract class EdqsSyncService { + @Value("${queue.edqs.sync.entity_batch_size:10000}") + private int entityBatchSize; + @Value("${queue.edqs.sync.ts_batch_size:10000}") + private int tsBatchSize; @Autowired private EntityDaoRegistry entityDaoRegistry; @Autowired - private TenantDao tenantDao; - @Autowired private AttributesDao attributesDao; @Autowired private KeyDictionaryDao keyDictionaryDao; @@ -112,7 +114,7 @@ public abstract class EdqsSyncService { Dao dao = entityDaoRegistry.getDao(entityType); UUID lastId = UUID.fromString("00000000-0000-0000-0000-000000000000"); while (true) { - var batch = dao.findNextBatch(lastId, 10000); + var batch = dao.findNextBatch(lastId, entityBatchSize); if (batch.isEmpty()) { break; } @@ -140,7 +142,7 @@ public abstract class EdqsSyncService { while (true) { List batch = relationRepository.findNextBatch(lastFromEntityId, lastFromEntityType, lastRelationTypeGroup, - lastRelationType, lastToEntityId, lastToEntityType, 10000); + lastRelationType, lastToEntityId, lastToEntityType, entityBatchSize); if (batch.isEmpty()) { break; } @@ -189,7 +191,7 @@ public abstract class EdqsSyncService { int lastAttributeKey = Integer.MIN_VALUE; while (true) { - List batch = attributesDao.findNextBatch(lastEntityId, lastAttributeType, lastAttributeKey, 10000); + List batch = attributesDao.findNextBatch(lastEntityId, lastAttributeType, lastAttributeKey, tsBatchSize); if (batch.isEmpty()) { break; } @@ -228,7 +230,7 @@ public abstract class EdqsSyncService { int lastKey = Integer.MIN_VALUE; while (true) { - List batch = tsKvLatestRepository.findNextBatch(lastEntityId, lastKey, 10000); + List batch = tsKvLatestRepository.findNextBatch(lastEntityId, lastKey, tsBatchSize); if (batch.isEmpty()) { break; } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index f4e9a02b45..0c6f948770 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -27,7 +27,7 @@ import java.util.Collections; @Service @RequiredArgsConstructor -@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'") +@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'kafka'") public class KafkaEdqsSyncService extends EdqsSyncService { private final TbKafkaSettings kafkaSettings; diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/LocalEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/LocalEdqsSyncService.java index 924bf94830..11d5894307 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/LocalEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/LocalEdqsSyncService.java @@ -22,7 +22,7 @@ import org.thingsboard.server.edqs.util.EdqsRocksDb; @Service @RequiredArgsConstructor -@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}' == 'true' && '${queue.type:null}' == 'in-memory'") +@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}' == 'true' && '${queue.type:null}' == 'in-memory'") public class LocalEdqsSyncService extends EdqsSyncService { private final EdqsRocksDb db; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index dd0e5948c7..e41d0c2aed 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1695,7 +1695,10 @@ queue: # Statistics printing interval for Housekeeper print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}" edqs: - sync_enabled: "${TB_EDQS_SYNC_ENABLED:true}" # FIXME: disable by default before release + sync: + enabled: "${TB_EDQS_SYNC_ENABLED:true}" # Enable/disable EDQS synchronization with postgres db FIXME: disable by default before release + entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}" # batch size of entities being synced with EDQS + ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}" # batch size of timeseries data being synced with EDQS api_enabled: "${TB_EDQS_API_ENABLED:true}" # FIXME: disable by default before release mode: "${TB_EDQS_MODE:local}" # local or remote local: diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index baf9f98993..9bb0bbe30a 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -36,7 +36,7 @@ import static org.awaitility.Awaitility.await; @TestPropertySource(properties = { // "queue.type=kafka", // uncomment to use Kafka // "queue.kafka.bootstrap.servers=10.7.1.254:9092", - "queue.edqs.sync_enabled=true", + "queue.edqs.sync.enabled=true", "queue.edqs.api_enabled=true", "queue.edqs.mode=local" }) diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java index cf24deb7e4..cb377e0431 100644 --- a/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/EdqsEntityServiceTest.java @@ -34,7 +34,7 @@ import static org.awaitility.Awaitility.await; @DaoSqlTest @TestPropertySource(properties = { - "queue.edqs.sync_enabled=true", + "queue.edqs.sync.enabled=true", "queue.edqs.api_enabled=true", "queue.edqs.mode=local" }) diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index ba6863c705..8933c36db5 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -57,5 +57,5 @@ server.log_controller_error_stack_trace=true transport.gateway.dashboard.sync.enabled=false -queue.edqs.sync_enabled=false +queue.edqs.sync.enabled=false queue.edqs.api_enabled=false diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java index 838f9b4aa2..27a1e09f79 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsComponent.java @@ -22,7 +22,7 @@ import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.RUNTIME) // TODO: tb-core ? -@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " + +@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " + "(('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && " + "'${queue.edqs.mode:null}'=='local'))") public @interface EdqsComponent { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsComponent.java index 5055787fde..3e04dda3b5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/InMemoryEdqsComponent.java @@ -21,6 +21,6 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.RUNTIME) -@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}'=='true' && '${service.type:null}'=='monolith' && '${queue.edqs.mode:null}'=='local' && '${queue.type:null}'=='in-memory'") +@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && '${service.type:null}'=='monolith' && '${queue.edqs.mode:null}'=='local' && '${queue.type:null}'=='in-memory'") public @interface InMemoryEdqsComponent { } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsComponent.java index 9f112e7f59..9963972e9c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/KafkaEdqsComponent.java @@ -21,7 +21,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @Retention(RetentionPolicy.RUNTIME) -@ConditionalOnExpression("'${queue.edqs.sync_enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " + +@ConditionalOnExpression("'${queue.edqs.sync.enabled:true}'=='true' && ('${service.type:null}'=='edqs' || " + "(('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && " + "'${queue.edqs.mode:null}'=='local' && '${queue.type:null}'=='kafka'))") public @interface KafkaEdqsComponent { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java index 91934293a6..b69aa52ec5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DummyEdqsService.java @@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.edqs.EdqsService; @Service -@ConditionalOnProperty(value = "queue.edqs.sync_enabled", havingValue = "false", matchIfMissing = true) +@ConditionalOnProperty(value = "queue.edqs.sync.enabled", havingValue = "false", matchIfMissing = true) public class DummyEdqsService implements EdqsService { @Override