Improvement to events cleanup script
This commit is contained in:
parent
fcbcae7f19
commit
7cc2508479
@ -0,0 +1,50 @@
|
|||||||
|
--
|
||||||
|
-- Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
--
|
||||||
|
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
-- you may not use this file except in compliance with the License.
|
||||||
|
-- You may obtain a copy of the License at
|
||||||
|
--
|
||||||
|
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
--
|
||||||
|
-- Unless required by applicable law or agreed to in writing, software
|
||||||
|
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
-- See the License for the specific language governing permissions and
|
||||||
|
-- limitations under the License.
|
||||||
|
--
|
||||||
|
|
||||||
|
|
||||||
|
DROP PROCEDURE IF EXISTS public.cleanup_events_by_ttl(bigint, bigint, bigint);
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(
|
||||||
|
IN regular_events_start_ts bigint,
|
||||||
|
IN regular_events_end_ts bigint,
|
||||||
|
IN debug_events_start_ts bigint,
|
||||||
|
IN debug_events_end_ts bigint,
|
||||||
|
INOUT deleted bigint)
|
||||||
|
LANGUAGE plpgsql AS
|
||||||
|
$$
|
||||||
|
DECLARE
|
||||||
|
ttl_deleted_count bigint DEFAULT 0;
|
||||||
|
debug_ttl_deleted_count bigint DEFAULT 0;
|
||||||
|
BEGIN
|
||||||
|
IF regular_events_start_ts > 0 AND regular_events_end_ts > 0 THEN
|
||||||
|
EXECUTE format(
|
||||||
|
'WITH deleted AS (DELETE FROM event WHERE id in (SELECT id from event WHERE ts > %L::bigint AND ts < %L::bigint AND ' ||
|
||||||
|
'(event_type != %L::varchar AND event_type != %L::varchar AND event_type != %L::varchar AND event_type != %L::varchar)) RETURNING *) ' ||
|
||||||
|
'SELECT count(*) FROM deleted', regular_events_start_ts, regular_events_end_ts,
|
||||||
|
'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN', 'DEBUG_CONVERTER', 'DEBUG_INTEGRATION') into ttl_deleted_count;
|
||||||
|
END IF;
|
||||||
|
IF debug_events_start_ts > 0 AND debug_events_end_ts > 0 THEN
|
||||||
|
EXECUTE format(
|
||||||
|
'WITH deleted AS (DELETE FROM event WHERE id in (SELECT id from event WHERE ts > %L::bigint AND ts < %L::bigint AND ' ||
|
||||||
|
'(event_type = %L::varchar OR event_type = %L::varchar OR event_type = %L::varchar OR event_type = %L::varchar)) RETURNING *) ' ||
|
||||||
|
'SELECT count(*) FROM deleted', debug_events_start_ts, debug_events_end_ts,
|
||||||
|
'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN', 'DEBUG_CONVERTER', 'DEBUG_INTEGRATION') into debug_ttl_deleted_count;
|
||||||
|
END IF;
|
||||||
|
RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count;
|
||||||
|
RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count;
|
||||||
|
deleted := ttl_deleted_count + debug_ttl_deleted_count;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
@ -521,6 +521,10 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
|||||||
} catch (Exception ignored) {
|
} catch (Exception ignored) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("Updating TTL cleanup procedure for the event table...");
|
||||||
|
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.3", "schema_event_ttl_procedure.sql");
|
||||||
|
loadSql(schemaUpdateFile, conn);
|
||||||
|
|
||||||
log.info("Updating schema settings...");
|
log.info("Updating schema settings...");
|
||||||
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3003004;");
|
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3003004;");
|
||||||
log.info("Schema updated.");
|
log.info("Schema updated.");
|
||||||
|
|||||||
@ -22,7 +22,8 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.thingsboard.server.dao.event.EventService;
|
import org.thingsboard.server.dao.event.EventService;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -33,10 +34,13 @@ public class EventsCleanUpService extends AbstractCleanUpService {
|
|||||||
"#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.events.execution_interval_ms})}";
|
"#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.events.execution_interval_ms})}";
|
||||||
|
|
||||||
@Value("${sql.ttl.events.events_ttl}")
|
@Value("${sql.ttl.events.events_ttl}")
|
||||||
private long ttl;
|
private long ttlInSec;
|
||||||
|
|
||||||
@Value("${sql.ttl.events.debug_events_ttl}")
|
@Value("${sql.ttl.events.debug_events_ttl}")
|
||||||
private long debugTtl;
|
private long debugTtlInSec;
|
||||||
|
|
||||||
|
@Value("${sql.ttl.events.execution_interval_ms}")
|
||||||
|
private long executionIntervalInMs;
|
||||||
|
|
||||||
@Value("${sql.ttl.events.enabled}")
|
@Value("${sql.ttl.events.enabled}")
|
||||||
private boolean ttlTaskExecutionEnabled;
|
private boolean ttlTaskExecutionEnabled;
|
||||||
@ -51,7 +55,27 @@ public class EventsCleanUpService extends AbstractCleanUpService {
|
|||||||
@Scheduled(initialDelayString = RANDOM_DELAY_INTERVAL_MS_EXPRESSION, fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
|
@Scheduled(initialDelayString = RANDOM_DELAY_INTERVAL_MS_EXPRESSION, fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
|
||||||
public void cleanUp() {
|
public void cleanUp() {
|
||||||
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
||||||
eventService.cleanupEvents(ttl, debugTtl);
|
long ts = System.currentTimeMillis();
|
||||||
|
long regularEventStartTs;
|
||||||
|
long regularEventEndTs;
|
||||||
|
long debugEventStartTs;
|
||||||
|
long debugEventEndTs;
|
||||||
|
|
||||||
|
if (ttlInSec > 0) {
|
||||||
|
regularEventEndTs = ts - TimeUnit.SECONDS.toMillis(ttlInSec);
|
||||||
|
regularEventStartTs = regularEventEndTs - 2 * executionIntervalInMs;
|
||||||
|
} else {
|
||||||
|
regularEventStartTs = regularEventEndTs = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (debugTtlInSec > 0) {
|
||||||
|
debugEventEndTs = ts - TimeUnit.SECONDS.toMillis(debugTtlInSec);
|
||||||
|
debugEventStartTs = debugEventEndTs - 2 * executionIntervalInMs;
|
||||||
|
} else {
|
||||||
|
debugEventStartTs = debugEventEndTs = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
eventService.cleanupEvents(regularEventStartTs, regularEventEndTs, debugEventStartTs, debugEventEndTs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -42,7 +42,6 @@ public class EventsCleanUpServiceTest {
|
|||||||
public void givenInterval_whenRandomDelay_ThenDelayInInterval() {
|
public void givenInterval_whenRandomDelay_ThenDelayInInterval() {
|
||||||
log.info("randomDelay {}", randomDelayMs);
|
log.info("randomDelay {}", randomDelayMs);
|
||||||
log.info("executionIntervalMs {}", executionIntervalMs);
|
log.info("executionIntervalMs {}", executionIntervalMs);
|
||||||
assertThat(executionIntervalMs, is(2220000L));
|
|
||||||
assertThat(randomDelayMs, greaterThanOrEqualTo(0L));
|
assertThat(randomDelayMs, greaterThanOrEqualTo(0L));
|
||||||
assertThat(randomDelayMs, lessThanOrEqualTo(executionIntervalMs));
|
assertThat(randomDelayMs, lessThanOrEqualTo(executionIntervalMs));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -48,6 +48,6 @@ public interface EventService {
|
|||||||
|
|
||||||
void removeEvents(TenantId tenantId, EntityId entityId, EventFilter eventFilter, Long startTime, Long endTime);
|
void removeEvents(TenantId tenantId, EntityId entityId, EventFilter eventFilter, Long startTime, Long endTime);
|
||||||
|
|
||||||
void cleanupEvents(long ttl, long debugTtl);
|
void cleanupEvents(long regularEventStartTs, long regularEventEndTs, long debugEventStartTs, long debugEventEndTs);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -141,8 +141,8 @@ public class BaseEventService implements EventService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanupEvents(long ttl, long debugTtl) {
|
public void cleanupEvents(long regularEventStartTs, long regularEventEndTs, long debugEventStartTs, long debugEventEndTs) {
|
||||||
eventDao.cleanupEvents(ttl, debugTtl);
|
eventDao.cleanupEvents(regularEventStartTs, regularEventEndTs, debugEventStartTs, debugEventEndTs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataValidator<Event> eventValidator =
|
private DataValidator<Event> eventValidator =
|
||||||
|
|||||||
@ -104,8 +104,10 @@ public interface EventDao extends Dao<Event> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
|
* Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
|
||||||
* @param otherEventsTtl the ttl for events in seconds
|
* @param regularEventStartTs the start time of the interval to use to delete non debug events
|
||||||
* @param debugEventsTtl the ttl for debug events in seconds
|
* @param regularEventEndTs the end time of the interval to use to delete non debug events
|
||||||
|
* @param debugEventStartTs the start time of the interval to use to delete debug events
|
||||||
|
* @param debugEventEndTs the end time of the interval to use to delete debug events
|
||||||
*/
|
*/
|
||||||
void cleanupEvents(long otherEventsTtl, long debugEventsTtl);
|
void cleanupEvents(long regularEventStartTs, long regularEventEndTs, long debugEventStartTs, long debugEventEndTs);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,6 @@ package org.thingsboard.server.dao.sql.event;
|
|||||||
|
|
||||||
public interface EventCleanupRepository {
|
public interface EventCleanupRepository {
|
||||||
|
|
||||||
void cleanupEvents(long otherEventsTtl, long debugEventsTtl);
|
void cleanupEvents(long regularEventStartTs, long regularEventEndTs, long debugEventStartTs, long debugEventEndTs);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,54 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright © 2016-2022 The Thingsboard Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.thingsboard.server.dao.sql.event;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.stereotype.Repository;
|
|
||||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
|
|
||||||
import org.thingsboard.server.dao.util.HsqlDao;
|
|
||||||
|
|
||||||
import java.sql.Connection;
|
|
||||||
import java.sql.PreparedStatement;
|
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@HsqlDao
|
|
||||||
@Repository
|
|
||||||
public class HsqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
|
|
||||||
long otherExpirationTime = System.currentTimeMillis() - otherEventsTtl * 1000;
|
|
||||||
try (Connection connection = dataSource.getConnection();
|
|
||||||
PreparedStatement stmt = connection.prepareStatement("DELETE FROM event WHERE ts < ? AND event_type != 'DEBUG_RULE_NODE' AND event_type != 'DEBUG_RULE_CHAIN'")) {
|
|
||||||
stmt.setLong(1, otherExpirationTime);
|
|
||||||
stmt.setQueryTimeout((int) TimeUnit.HOURS.toSeconds(1));
|
|
||||||
stmt.execute();
|
|
||||||
} catch (SQLException e) {
|
|
||||||
log.error("SQLException occurred during events TTL task execution ", e);
|
|
||||||
}
|
|
||||||
long debugExpirationTime = System.currentTimeMillis() - debugEventsTtl * 1000;
|
|
||||||
try (Connection connection = dataSource.getConnection();
|
|
||||||
PreparedStatement stmt = connection.prepareStatement("DELETE FROM event WHERE ts < ? AND (event_type = 'DEBUG_RULE_NODE' OR event_type = 'DEBUG_RULE_CHAIN')")) {
|
|
||||||
stmt.setLong(1, debugExpirationTime);
|
|
||||||
stmt.setQueryTimeout((int) TimeUnit.HOURS.toSeconds(1));
|
|
||||||
stmt.execute();
|
|
||||||
} catch (SQLException e) {
|
|
||||||
log.error("SQLException occurred during events TTL task execution ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -259,9 +259,9 @@ public class JpaBaseEventDao extends JpaAbstractDao<EventEntity, Event> implemen
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
|
public void cleanupEvents(long regularEventStartTs, long regularEventEndTs, long debugEventStartTs, long debugEventEndTs) {
|
||||||
log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugEventsTtl, otherEventsTtl);
|
log.info("Going to cleanup old events. Interval for regular events: [{}:{}], for debug events: [{}:{}]", regularEventStartTs, regularEventEndTs, debugEventStartTs, debugEventEndTs);
|
||||||
eventCleanupRepository.cleanupEvents(otherEventsTtl, debugEventsTtl);
|
eventCleanupRepository.cleanupEvents(regularEventStartTs, regularEventEndTs, debugEventStartTs, debugEventEndTs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<Event> save(EventEntity entity, boolean ifNotExists) {
|
public Optional<Event> save(EventEntity entity, boolean ifNotExists) {
|
||||||
|
|||||||
@ -32,12 +32,14 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class PsqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository {
|
public class PsqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
|
public void cleanupEvents(long regularEventStartTs, long regularEventEndTs, long debugEventStartTs, long debugEventEndTs) {
|
||||||
try (Connection connection = dataSource.getConnection();
|
try (Connection connection = dataSource.getConnection();
|
||||||
PreparedStatement stmt = connection.prepareStatement("call cleanup_events_by_ttl(?,?,?)")) {
|
PreparedStatement stmt = connection.prepareStatement("call cleanup_events_by_ttl(?,?,?,?,?)")) {
|
||||||
stmt.setLong(1, otherEventsTtl);
|
stmt.setLong(1, regularEventStartTs);
|
||||||
stmt.setLong(2, debugEventsTtl);
|
stmt.setLong(2, regularEventEndTs);
|
||||||
stmt.setLong(3, 0);
|
stmt.setLong(3, debugEventStartTs);
|
||||||
|
stmt.setLong(4, debugEventEndTs);
|
||||||
|
stmt.setLong(5, 0);
|
||||||
stmt.setQueryTimeout((int) TimeUnit.HOURS.toSeconds(1));
|
stmt.setQueryTimeout((int) TimeUnit.HOURS.toSeconds(1));
|
||||||
stmt.execute();
|
stmt.execute();
|
||||||
printWarnings(stmt);
|
printWarnings(stmt);
|
||||||
|
|||||||
@ -632,24 +632,31 @@ CREATE TABLE IF NOT EXISTS rpc (
|
|||||||
status varchar(255) NOT NULL
|
status varchar(255) NOT NULL
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint)
|
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(
|
||||||
|
IN regular_events_start_ts bigint,
|
||||||
|
IN regular_events_end_ts bigint,
|
||||||
|
IN debug_events_start_ts bigint,
|
||||||
|
IN debug_events_end_ts bigint,
|
||||||
|
INOUT deleted bigint)
|
||||||
LANGUAGE plpgsql AS
|
LANGUAGE plpgsql AS
|
||||||
$$
|
$$
|
||||||
DECLARE
|
DECLARE
|
||||||
ttl_ts bigint;
|
|
||||||
debug_ttl_ts bigint;
|
|
||||||
ttl_deleted_count bigint DEFAULT 0;
|
ttl_deleted_count bigint DEFAULT 0;
|
||||||
debug_ttl_deleted_count bigint DEFAULT 0;
|
debug_ttl_deleted_count bigint DEFAULT 0;
|
||||||
BEGIN
|
BEGIN
|
||||||
IF ttl > 0 THEN
|
IF regular_events_start_ts > 0 AND regular_events_end_ts > 0 THEN
|
||||||
ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint;
|
|
||||||
EXECUTE format(
|
EXECUTE format(
|
||||||
'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type != %L::varchar AND event_type != %L::varchar) RETURNING *) SELECT count(*) FROM deleted', ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into ttl_deleted_count;
|
'WITH deleted AS (DELETE FROM event WHERE id in (SELECT id from event WHERE ts > %L::bigint AND ts < %L::bigint AND ' ||
|
||||||
|
'(event_type != %L::varchar AND event_type != %L::varchar AND event_type != %L::varchar AND event_type != %L::varchar)) RETURNING *) ' ||
|
||||||
|
'SELECT count(*) FROM deleted', regular_events_start_ts, regular_events_end_ts,
|
||||||
|
'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN', 'DEBUG_CONVERTER', 'DEBUG_INTEGRATION') into ttl_deleted_count;
|
||||||
END IF;
|
END IF;
|
||||||
IF debug_ttl > 0 THEN
|
IF debug_events_start_ts > 0 AND debug_events_end_ts > 0 THEN
|
||||||
debug_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - debug_ttl::bigint * 1000)::bigint;
|
|
||||||
EXECUTE format(
|
EXECUTE format(
|
||||||
'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type = %L::varchar OR event_type = %L::varchar) RETURNING *) SELECT count(*) FROM deleted', debug_ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into debug_ttl_deleted_count;
|
'WITH deleted AS (DELETE FROM event WHERE id in (SELECT id from event WHERE ts > %L::bigint AND ts < %L::bigint AND ' ||
|
||||||
|
'(event_type = %L::varchar OR event_type = %L::varchar OR event_type = %L::varchar OR event_type = %L::varchar)) RETURNING *) ' ||
|
||||||
|
'SELECT count(*) FROM deleted', debug_events_start_ts, debug_events_end_ts,
|
||||||
|
'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN', 'DEBUG_CONVERTER', 'DEBUG_INTEGRATION') into debug_ttl_deleted_count;
|
||||||
END IF;
|
END IF;
|
||||||
RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count;
|
RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count;
|
||||||
RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count;
|
RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user