Added TTL for edge events
This commit is contained in:
		
							parent
							
								
									a75f52912e
								
							
						
					
					
						commit
						bf21ff3c09
					
				@ -0,0 +1,32 @@
 | 
			
		||||
--
 | 
			
		||||
-- Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
--
 | 
			
		||||
-- Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
-- you may not use this file except in compliance with the License.
 | 
			
		||||
-- You may obtain a copy of the License at
 | 
			
		||||
--
 | 
			
		||||
--     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
--
 | 
			
		||||
-- Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
-- distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
-- See the License for the specific language governing permissions and
 | 
			
		||||
-- limitations under the License.
 | 
			
		||||
--
 | 
			
		||||
 | 
			
		||||
CREATE OR REPLACE PROCEDURE cleanup_edge_events_by_ttl(IN ttl bigint, INOUT deleted bigint)
 | 
			
		||||
    LANGUAGE plpgsql AS
 | 
			
		||||
$$
 | 
			
		||||
DECLARE
 | 
			
		||||
    ttl_ts bigint;
 | 
			
		||||
    ttl_deleted_count bigint DEFAULT 0;
 | 
			
		||||
BEGIN
 | 
			
		||||
    IF ttl > 0 THEN
 | 
			
		||||
        ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint;
 | 
			
		||||
        EXECUTE format(
 | 
			
		||||
                'WITH deleted AS (DELETE FROM edge_event WHERE ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', ttl_ts) into ttl_deleted_count;
 | 
			
		||||
    END IF;
 | 
			
		||||
    RAISE NOTICE 'Edge events removed by ttl: %', ttl_deleted_count;
 | 
			
		||||
    deleted := ttl_deleted_count;
 | 
			
		||||
END
 | 
			
		||||
$$;
 | 
			
		||||
@ -0,0 +1,56 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2020 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.ttl.edge;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.dao.util.PsqlDao;
 | 
			
		||||
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
 | 
			
		||||
 | 
			
		||||
import java.sql.Connection;
 | 
			
		||||
import java.sql.DriverManager;
 | 
			
		||||
import java.sql.SQLException;
 | 
			
		||||
 | 
			
		||||
@PsqlDao
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Service
 | 
			
		||||
public class EdgeEventsCleanUpService extends AbstractCleanUpService {
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ttl.edge_events.edge_events_ttl}")
 | 
			
		||||
    private long ttl;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ttl.edge_events.enabled}")
 | 
			
		||||
    private boolean ttlTaskExecutionEnabled;
 | 
			
		||||
 | 
			
		||||
    @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
 | 
			
		||||
    public void cleanUp() {
 | 
			
		||||
        if (ttlTaskExecutionEnabled) {
 | 
			
		||||
            try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
 | 
			
		||||
                doCleanUp(conn);
 | 
			
		||||
            } catch (SQLException e) {
 | 
			
		||||
                log.error("SQLException occurred during TTL task execution ", e);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected void doCleanUp(Connection connection) {
 | 
			
		||||
        long totalEdgeEventsRemoved = executeQuery(connection, "call cleanup_edge_events_by_ttl(" + ttl + ", 0);");
 | 
			
		||||
        log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -282,6 +282,10 @@ sql:
 | 
			
		||||
        execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day
 | 
			
		||||
        events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds
 | 
			
		||||
        debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week
 | 
			
		||||
      edge_events:
 | 
			
		||||
        enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}"
 | 
			
		||||
        execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day
 | 
			
		||||
        edge_events_ttl: "${SQL_TTL_EDGE_EVENTS_TTL:2628000}" # Number of seconds. The current value corresponds to one month
 | 
			
		||||
 | 
			
		||||
# Actor system parameters
 | 
			
		||||
actors:
 | 
			
		||||
 | 
			
		||||
@ -307,3 +307,20 @@ BEGIN
 | 
			
		||||
    deleted := ttl_deleted_count + debug_ttl_deleted_count;
 | 
			
		||||
END
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
CREATE OR REPLACE PROCEDURE cleanup_edge_events_by_ttl(IN ttl bigint, INOUT deleted bigint)
 | 
			
		||||
    LANGUAGE plpgsql AS
 | 
			
		||||
$$
 | 
			
		||||
DECLARE
 | 
			
		||||
    ttl_ts bigint;
 | 
			
		||||
    ttl_deleted_count bigint DEFAULT 0;
 | 
			
		||||
BEGIN
 | 
			
		||||
    IF ttl > 0 THEN
 | 
			
		||||
        ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint;
 | 
			
		||||
        EXECUTE format(
 | 
			
		||||
                'WITH deleted AS (DELETE FROM edge_event WHERE ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted', ttl_ts) into ttl_deleted_count;
 | 
			
		||||
    END IF;
 | 
			
		||||
    RAISE NOTICE 'Edge events removed by ttl: %', ttl_deleted_count;
 | 
			
		||||
    deleted := ttl_deleted_count;
 | 
			
		||||
END
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user