Fix partition overlap error handling in SqlPartitioningRepository

This commit is contained in:
ViacheslavKlimov 2022-10-04 10:45:43 +03:00
parent a9f776587c
commit c7c8ea0105
4 changed files with 37 additions and 37 deletions

View File

@ -14,7 +14,27 @@
-- limitations under the License. -- limitations under the License.
-- --
CREATE TABLE IF NOT EXISTS tmp_audit_log ( DO
$$
DECLARE table_partition RECORD;
BEGIN
-- in case of running the upgrade script a second time:
IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_audit_log')) THEN
ALTER TABLE audit_log RENAME TO old_audit_log;
ALTER INDEX IF EXISTS idx_audit_log_tenant_id_and_created_time RENAME TO idx_old_audit_log_tenant_id_and_created_time;
FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
FROM pg_tables WHERE tablename LIKE 'audit_log_%'
LOOP
EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts);
END LOOP;
ELSE
RAISE NOTICE 'Table old_audit_log already exists, leaving as is';
END IF;
END;
$$;
CREATE TABLE IF NOT EXISTS audit_log (
id uuid NOT NULL, id uuid NOT NULL,
created_time bigint NOT NULL, created_time bigint NOT NULL,
tenant_id uuid, tenant_id uuid,
@ -29,21 +49,7 @@ CREATE TABLE IF NOT EXISTS tmp_audit_log (
action_status varchar(255), action_status varchar(255),
action_failure_details varchar(1000000) action_failure_details varchar(1000000)
) PARTITION BY RANGE (created_time); ) PARTITION BY RANGE (created_time);
CREATE INDEX IF NOT EXISTS idx_tmp_audit_log_tenant_id_and_created_time ON tmp_audit_log(tenant_id, created_time DESC); CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
CREATE OR REPLACE PROCEDURE rename_old_audit_logs_partitions()
LANGUAGE plpgsql AS
$$
DECLARE
table_partition RECORD;
BEGIN
FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
FROM pg_tables WHERE tablename LIKE 'audit_log_%'
LOOP
EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts);
END LOOP;
END;
$$;
CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
LANGUAGE plpgsql AS LANGUAGE plpgsql AS
@ -52,17 +58,17 @@ DECLARE
p RECORD; p RECORD;
partition_end_ts BIGINT; partition_end_ts BIGINT;
BEGIN BEGIN
FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM audit_log FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log
WHERE created_time >= start_time_ms AND created_time < end_time_ms WHERE created_time >= start_time_ms AND created_time < end_time_ms
LOOP LOOP
partition_end_ts = p.partition_ts + partition_size_ms; partition_end_ts = p.partition_ts + partition_size_ms;
RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF tmp_audit_log ' || EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' ||
'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
END LOOP; END LOOP;
INSERT INTO tmp_audit_log INSERT INTO audit_log
SELECT * FROM audit_log SELECT * FROM old_audit_log
WHERE created_time >= start_time_ms AND created_time < end_time_ms; WHERE created_time >= start_time_ms AND created_time < end_time_ms;
END; END;
$$; $$;

View File

@ -28,7 +28,6 @@
<logger name="org.thingsboard.server" level="INFO"/> <logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/> <logger name="org.apache.kafka.common.utils.AppInfoParser" level="WARN"/>
<logger name="org.apache.kafka.clients" level="WARN"/> <logger name="org.apache.kafka.clients" level="WARN"/>
<logger name="org.thingsboard.server.dao.sqlts.insert.sql" level="TRACE"/>
<!-- To enable the logging of scanned rule engine components--> <!-- To enable the logging of scanned rule engine components-->
<!-- <logger name="org.thingsboard.server.service.component.AnnotationComponentDiscoveryService" level="DEBUG" />--> <!-- <logger name="org.thingsboard.server.service.component.AnnotationComponentDiscoveryService" level="DEBUG" />-->
<!-- Other useful logs --> <!-- Other useful logs -->

View File

@ -154,7 +154,6 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
throw new RuntimeException(error); throw new RuntimeException(error);
} }
jdbcTemplate.execute("CALL rename_old_audit_logs_partitions()");
while (startTime < currentTime) { while (startTime < currentTime) {
var endTime = startTime + partitionStepInMs; var endTime = startTime + partitionStepInMs;
log.info("Migrating audit logs for time period: {} - {}", startTime, endTime); log.info("Migrating audit logs for time period: {} - {}", startTime, endTime);
@ -163,9 +162,7 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
} }
log.info("Audit logs migration finished"); log.info("Audit logs migration finished");
jdbcTemplate.execute("DROP TABLE IF EXISTS audit_log"); jdbcTemplate.execute("DROP TABLE IF EXISTS old_audit_log");
jdbcTemplate.execute("ALTER TABLE tmp_audit_log RENAME TO audit_log");
jdbcTemplate.execute("ALTER INDEX IF EXISTS idx_tmp_audit_log_tenant_id_and_created_time RENAME TO idx_audit_log_tenant_id_and_created_time");
} }
private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) { private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) {

View File

@ -17,7 +17,6 @@ package org.thingsboard.server.dao.sqlts.insert.sql;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
@ -68,17 +67,16 @@ public class SqlPartitioningRepository {
save(partition); save(partition);
log.trace("Adding partition to map: {}", partition); log.trace("Adding partition to map: {}", partition);
partitions.put(partition.getStart(), partition); partitions.put(partition.getStart(), partition);
} catch (Exception ex) { // fixme: check } catch (RuntimeException e) {
log.trace("Error occurred during partition save:", ex); log.trace("Error occurred during partition save:", e);
// todo: if partitions накладаються, потестити (ConstraintViolationException) String msg = ExceptionUtils.getRootCauseMessage(e);
// todo: SKIP_MIGRATION тільки для того щоб не переносити данні, таблицю треба створити partitioned. if (msg.contains("would overlap partition")) {
// fixme: update script log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}",
// if (ExceptionUtils.indexOfThrowable(ex, ConstraintViolationException.class) >= 0) { partition.getPartitionDate(), table, msg);
// log.warn("Saving partition [{}] rejected. Data will be saved to the DEFAULT partition.", partition.getPartitionDate()); partitions.put(partition.getStart(), partition);
// partitions.put(partition.getStart(), partition); } else {
// } else { throw e;
// throw ex; }
// }
} finally { } finally {
partitionCreationLock.unlock(); partitionCreationLock.unlock();
} }