deleted redundant code, yml parameters, reverted deleted PaginatedUpdater for future development
This commit is contained in:
parent
3c53d7ec71
commit
1fce9582af
@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.install.update;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
|
||||
@Slf4j
|
||||
public abstract class PaginatedUpdater<I, D> {
|
||||
|
||||
private static final int DEFAULT_LIMIT = 100;
|
||||
private int updated = 0;
|
||||
|
||||
public void updateEntities(I id) {
|
||||
updated = 0;
|
||||
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
|
||||
boolean hasNext = true;
|
||||
while (hasNext) {
|
||||
PageData<D> entities = findEntities(id, pageLink);
|
||||
for (D entity : entities.getData()) {
|
||||
updateEntity(entity);
|
||||
}
|
||||
updated += entities.getData().size();
|
||||
hasNext = entities.hasNext();
|
||||
if (hasNext) {
|
||||
log.info("{}: {} entities updated so far...", getName(), updated);
|
||||
pageLink = pageLink.nextPageLink();
|
||||
} else {
|
||||
if (updated > DEFAULT_LIMIT || forceReportTotal()) {
|
||||
log.info("{}: {} total entities updated.", getName(), updated);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void updateEntities() {
|
||||
updateEntities(null);
|
||||
}
|
||||
|
||||
protected boolean forceReportTotal() {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected abstract String getName();
|
||||
|
||||
protected abstract PageData<D> findEntities(I id, PageLink pageLink);
|
||||
|
||||
protected abstract void updateEntity(D entity);
|
||||
|
||||
}
|
||||
@ -1601,58 +1601,6 @@ queue:
|
||||
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}"
|
||||
# Max length of the error message that is printed by statistics
|
||||
max-error-message-length: "${TB_QUEUE_RULE_ENGINE_MAX_ERROR_MESSAGE_LENGTH:4096}"
|
||||
queues:
|
||||
- name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}" # queue name
|
||||
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}" # queue topic
|
||||
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" # poll interval
|
||||
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" # number queue partitions
|
||||
consumer-per-partition: "${TB_QUEUE_RE_MAIN_CONSUMER_PER_PARTITION:true}" # if true - use for each customer different partition
|
||||
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:2000}" # Timeout for processing a message pack
|
||||
submit-strategy:
|
||||
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
|
||||
# For BATCH only
|
||||
batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
|
||||
processing-strategy:
|
||||
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
|
||||
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
|
||||
retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
|
||||
failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less than X percentage of messages;
|
||||
pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}" # Time in seconds to wait in consumer thread before retries;
|
||||
max-pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:3}" # Max allowed time in seconds for pause between retries.
|
||||
- name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" # queue name
|
||||
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}" # queue topic
|
||||
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" # poll interval
|
||||
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}" # number queue partitions
|
||||
consumer-per-partition: "${TB_QUEUE_RE_HP_CONSUMER_PER_PARTITION:true}" # if true - use for each customer different partition
|
||||
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:2000}" # Timeout for processing a message pack
|
||||
submit-strategy:
|
||||
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
|
||||
# For BATCH only
|
||||
batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
|
||||
processing-strategy:
|
||||
type: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
|
||||
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
|
||||
retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
|
||||
failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less than X percentage of messages;
|
||||
pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries;
|
||||
max-pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries.
|
||||
- name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}" # queue name
|
||||
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}" # queue topic
|
||||
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}" # poll interval
|
||||
partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}" # number queue partitions
|
||||
consumer-per-partition: "${TB_QUEUE_RE_SQ_CONSUMER_PER_PARTITION:true}" # if true - use for each customer different partition
|
||||
pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:2000}" # Timeout for processing a message pack
|
||||
submit-strategy:
|
||||
type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
|
||||
# For BATCH only
|
||||
batch-size: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
|
||||
processing-strategy:
|
||||
type: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, SKIP_ALL_FAILURES_AND_TIMED_OUT, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
|
||||
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
|
||||
retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
|
||||
failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less than X percentage of messages;
|
||||
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries;
|
||||
max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries.
|
||||
# After a queue is deleted (or the profile's isolation option was disabled), Rule Engine will continue reading related topics during this period before deleting the actual topics
|
||||
topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}"
|
||||
# Size of the thread pool that handles such operations as partition changes, config updates, queue deletion
|
||||
|
||||
@ -42,21 +42,6 @@ queue.transport.poll_interval=5
|
||||
queue.core.poll-interval=5
|
||||
queue.core.partitions=2
|
||||
queue.rule-engine.poll-interval=5
|
||||
queue.rule-engine.queues[0].poll-interval=5
|
||||
queue.rule-engine.queues[0].partitions=2
|
||||
queue.rule-engine.queues[0].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0
|
||||
queue.rule-engine.queues[1].poll-interval=5
|
||||
queue.rule-engine.queues[1].partitions=2
|
||||
queue.rule-engine.queues[1].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[1].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[1].processing-strategy.max-pause-between-retries=0
|
||||
queue.rule-engine.queues[2].poll-interval=5
|
||||
queue.rule-engine.queues[2].partitions=2
|
||||
queue.rule-engine.queues[2].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0
|
||||
|
||||
queue.rule-engine.stats.enabled=true
|
||||
usage.stats.report.enabled=false
|
||||
|
||||
@ -43,5 +43,4 @@ public interface EventService {
|
||||
|
||||
void cleanupEvents(long regularEventExpTs, long debugEventExpTs, boolean cleanupDb);
|
||||
|
||||
void migrateEvents();
|
||||
}
|
||||
|
||||
@ -130,11 +130,6 @@ public class BaseEventService implements EventService {
|
||||
eventDao.cleanupEvents(regularEventExpTs, debugEventExpTs, cleanupDb);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrateEvents() {
|
||||
eventDao.migrateEvents(ttlInSec > 0 ? (System.currentTimeMillis() - ttlInSec * 1000) : 0, debugTtlInSec > 0 ? (System.currentTimeMillis() - debugTtlInSec * 1000) : 0);
|
||||
}
|
||||
|
||||
private PageData<EventInfo> convert(EntityType entityType, PageData<? extends Event> pd) {
|
||||
return new PageData<>(pd.getData() == null ? null :
|
||||
pd.getData().stream().map(e -> e.toInfo(entityType)).collect(Collectors.toList())
|
||||
|
||||
@ -124,20 +124,5 @@ queue.transport.poll_interval=5
|
||||
queue.core.poll-interval=5
|
||||
queue.core.partitions=2
|
||||
queue.rule-engine.poll-interval=5
|
||||
queue.rule-engine.queues[0].poll-interval=5
|
||||
queue.rule-engine.queues[0].partitions=2
|
||||
queue.rule-engine.queues[0].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0
|
||||
queue.rule-engine.queues[1].poll-interval=5
|
||||
queue.rule-engine.queues[1].partitions=2
|
||||
queue.rule-engine.queues[1].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[1].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[1].processing-strategy.max-pause-between-retries=0
|
||||
queue.rule-engine.queues[2].poll-interval=5
|
||||
queue.rule-engine.queues[2].partitions=2
|
||||
queue.rule-engine.queues[2].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0
|
||||
|
||||
spring.jpa.properties.hibernate.dialect=org.thingsboard.server.dao.ThingsboardPostgreSQLDialect
|
||||
@ -16,14 +16,3 @@ spring.datasource.password=postgres
|
||||
spring.datasource.url=jdbc:tc:postgresql:12.8:///thingsboard?TC_DAEMON=true&TC_TMPFS=/testtmpfs:rw&?TC_INITFUNCTION=org.thingsboard.server.dao.PostgreSqlInitializer::initDb
|
||||
spring.datasource.driverClassName=org.testcontainers.jdbc.ContainerDatabaseDriver
|
||||
spring.datasource.hikari.maximumPoolSize=16
|
||||
|
||||
queue.rule-engine.queues[0].name=Main
|
||||
queue.rule-engine.queues[0].topic=tb_rule_engine.main
|
||||
queue.rule-engine.queues[0].poll-interval=5
|
||||
queue.rule-engine.queues[0].partitions=2
|
||||
queue.rule-engine.queues[0].pack-processing-timeout=3000
|
||||
queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES
|
||||
queue.rule-engine.queues[0].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0
|
||||
queue.rule-engine.queues[0].submit-strategy.type=BURST
|
||||
|
||||
@ -41,15 +41,4 @@ service.type=monolith
|
||||
queue.core.pack-processing-timeout=3000
|
||||
queue.rule-engine.pack-processing-timeout=3000
|
||||
|
||||
queue.rule-engine.queues[0].name=Main
|
||||
queue.rule-engine.queues[0].topic=tb_rule_engine.main
|
||||
queue.rule-engine.queues[0].poll-interval=5
|
||||
queue.rule-engine.queues[0].partitions=2
|
||||
queue.rule-engine.queues[0].pack-processing-timeout=3000
|
||||
queue.rule-engine.queues[0].processing-strategy.type=SKIP_ALL_FAILURES
|
||||
queue.rule-engine.queues[0].processing-strategy.retries=1
|
||||
queue.rule-engine.queues[0].processing-strategy.pause-between-retries=0
|
||||
queue.rule-engine.queues[0].processing-strategy.max-pause-between-retries=0
|
||||
queue.rule-engine.queues[0].submit-strategy.type=BURST
|
||||
|
||||
sql.log_entity_queries=true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user