diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java new file mode 100644 index 0000000000..3f2a80f4e2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/install/update/PaginatedUpdater.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.install.update; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; + +@Slf4j +public abstract class PaginatedUpdater { + + private static final int DEFAULT_LIMIT = 100; + private int updated = 0; + + public void updateEntities(I id) { + updated = 0; + PageLink pageLink = new PageLink(DEFAULT_LIMIT); + boolean hasNext = true; + while (hasNext) { + PageData entities = findEntities(id, pageLink); + for (D entity : entities.getData()) { + updateEntity(entity); + } + updated += entities.getData().size(); + hasNext = entities.hasNext(); + if (hasNext) { + log.info("{}: {} entities updated so far...", getName(), updated); + pageLink = pageLink.nextPageLink(); + } else { + if (updated > DEFAULT_LIMIT || forceReportTotal()) { + log.info("{}: {} total entities updated.", getName(), updated); + } + } + } + } + + public void updateEntities() { + updateEntities(null); + } + + protected boolean forceReportTotal() { + return false; + } + + protected abstract String getName(); + + protected abstract PageData findEntities(I id, PageLink pageLink); + + protected abstract void updateEntity(D entity); + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 936f30af60..70a5146bb7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1601,58 +1601,6 @@ queue: print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}" # Max length of the error message that is printed by statistics max-error-message-length: "${TB_QUEUE_RULE_ENGINE_MAX_ERROR_MESSAGE_LENGTH:4096}" - queues: - - name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}" # queue name - topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}" # queue topic - poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" # poll interval - partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" # number queue partitions - consumer-per-partition: "${TB_QUEUE_RE_MAIN_CONSUMER_PER_PARTITION:true}" # if true - use for each customer different partition - pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}" # Timeout for processing a message pack - submit-strategy: - type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL - # For BATCH only - batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch - processing-strategy: - type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less than X percentage of messages; - pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}" # Time in seconds to wait in consumer thread before retries; - max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:3}" # Max allowed time in seconds for pause between retries. - - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" # queue name - topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" # queue topic - poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" # poll interval - partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}" # number queue partitions - consumer-per-partition: "${TB_QUEUE_RE_HP_CONSUMER_PER_PARTITION:true}" # if true - use for each customer different partition - pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}" # Timeout for processing a message pack - submit-strategy: - type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL - # For BATCH only - batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch - processing-strategy: - type: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less than X percentage of messages; - pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries; - max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries. - - name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" # queue name - topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" # queue topic - poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" # poll interval - partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}" # number queue partitions - consumer-per-partition: "${TB_QUEUE_RE_SQ_CONSUMER_PER_PARTITION:true}" # if true - use for each customer different partition - pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}" # Timeout for processing a message pack - submit-strategy: - type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL - # For BATCH only - batch-size: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch - processing-strategy: - type: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less than X percentage of messages; - pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries; - max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries. # After a queue is deleted (or the profile's isolation option was disabled), Rule Engine will continue reading related topics during this period before deleting the actual topics topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}" # Size of the thread pool that handles such operations as partition changes, config updates, queue deletion diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index b01ffe0d72..92959e4552 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -42,21 +42,6 @@ queue.transport.poll_interval=5 queue.core.poll-interval=5 queue.core.partitions=2 queue.rule-engine.poll-interval=5 -queue.rule-engine.queues[0].poll-interval=5 -queue.rule-engine.queues[0].partitions=2 -queue.rule-engine.queues[0].processing-strategy.retries=1 -queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0 -queue.rule-engine.queues[1].poll-interval=5 -queue.rule-engine.queues[1].partitions=2 -queue.rule-engine.queues[1].processing-strategy.retries=1 -queue.rule-engine.queues[1].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[1].processing-strategy.max-pause-between-retries=0 -queue.rule-engine.queues[2].poll-interval=5 -queue.rule-engine.queues[2].partitions=2 -queue.rule-engine.queues[2].processing-strategy.retries=1 -queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0 queue.rule-engine.stats.enabled=true usage.stats.report.enabled=false diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java index 219b4163f9..9b0d5a280d 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java @@ -43,5 +43,4 @@ public interface EventService { void cleanupEvents(long regularEventExpTs, long debugEventExpTs, boolean cleanupDb); - void migrateEvents(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java index b9d7f7df86..88834eecbc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java @@ -130,11 +130,6 @@ public class BaseEventService implements EventService { eventDao.cleanupEvents(regularEventExpTs, debugEventExpTs, cleanupDb); } - @Override - public void migrateEvents() { - eventDao.migrateEvents(ttlInSec > 0 ? (System.currentTimeMillis() - ttlInSec * 1000) : 0, debugTtlInSec > 0 ? (System.currentTimeMillis() - debugTtlInSec * 1000) : 0); - } - private PageData convert(EntityType entityType, PageData pd) { return new PageData<>(pd.getData() == null ? null : pd.getData().stream().map(e -> e.toInfo(entityType)).collect(Collectors.toList()) diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index 05f2cf7670..1b5c4b93da 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -124,20 +124,5 @@ queue.transport.poll_interval=5 queue.core.poll-interval=5 queue.core.partitions=2 queue.rule-engine.poll-interval=5 -queue.rule-engine.queues[0].poll-interval=5 -queue.rule-engine.queues[0].partitions=2 -queue.rule-engine.queues[0].processing-strategy.retries=1 -queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0 -queue.rule-engine.queues[1].poll-interval=5 -queue.rule-engine.queues[1].partitions=2 -queue.rule-engine.queues[1].processing-strategy.retries=1 -queue.rule-engine.queues[1].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[1].processing-strategy.max-pause-between-retries=0 -queue.rule-engine.queues[2].poll-interval=5 -queue.rule-engine.queues[2].partitions=2 -queue.rule-engine.queues[2].processing-strategy.retries=1 -queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0 spring.jpa.properties.hibernate.dialect=org.thingsboard.server.dao.ThingsboardPostgreSQLDialect \ No newline at end of file diff --git a/dao/src/test/resources/nosql-test.properties b/dao/src/test/resources/nosql-test.properties index d63a9f7f82..d72bb10b82 100644 --- a/dao/src/test/resources/nosql-test.properties +++ b/dao/src/test/resources/nosql-test.properties @@ -16,14 +16,3 @@ spring.datasource.password=postgres spring.datasource.url=jdbc:tc:postgresql:12.8:///thingsboard?TC_DAEMON=true&TC_TMPFS=/testtmpfs:rw&?TC_INITFUNCTION=org.thingsboard.server.dao.PostgreSqlInitializer::initDb spring.datasource.driverClassName=org.testcontainers.jdbc.ContainerDatabaseDriver spring.datasource.hikari.maximumPoolSize=16 - -queue.rule-engine.queues[0].name=Main -queue.rule-engine.queues[0].topic=tb_rule_engine.main -queue.rule-engine.queues[0].poll-interval=5 -queue.rule-engine.queues[0].partitions=2 -queue.rule-engine.queues[0].pack-processing-timeout=3000 -queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES -queue.rule-engine.queues[0].processing-strategy.retries=1 -queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0 -queue.rule-engine.queues[0].submit-strategy.type=BURST diff --git a/dao/src/test/resources/sql-test.properties b/dao/src/test/resources/sql-test.properties index 2fed33b43a..4292db28d6 100644 --- a/dao/src/test/resources/sql-test.properties +++ b/dao/src/test/resources/sql-test.properties @@ -41,15 +41,4 @@ service.type=monolith queue.core.pack-processing-timeout=3000 queue.rule-engine.pack-processing-timeout=3000 -queue.rule-engine.queues[0].name=Main -queue.rule-engine.queues[0].topic=tb_rule_engine.main -queue.rule-engine.queues[0].poll-interval=5 -queue.rule-engine.queues[0].partitions=2 -queue.rule-engine.queues[0].pack-processing-timeout=3000 -queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES -queue.rule-engine.queues[0].processing-strategy.retries=1 -queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0 -queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0 -queue.rule-engine.queues[0].submit-strategy.type=BURST - sql.log_entity_queries=true