Merge pull request #2655 from ShvaykaD/feature/psql-event-ttl
Events TTL for PostgreSQL
This commit is contained in:
commit
c286a489dd
@ -123,3 +123,28 @@ BEGIN
|
|||||||
END LOOP;
|
END LOOP;
|
||||||
END
|
END
|
||||||
$$;
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint)
|
||||||
|
LANGUAGE plpgsql AS
|
||||||
|
$$
|
||||||
|
DECLARE
|
||||||
|
ttl_ts bigint;
|
||||||
|
debug_ttl_ts bigint;
|
||||||
|
ttl_deleted_count bigint DEFAULT 0;
|
||||||
|
debug_ttl_deleted_count bigint DEFAULT 0;
|
||||||
|
BEGIN
|
||||||
|
IF ttl > 0 THEN
|
||||||
|
ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint;
|
||||||
|
EXECUTE format(
|
||||||
|
'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type != %L::varchar AND event_type != %L::varchar) RETURNING *) SELECT count(*) FROM deleted', ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into ttl_deleted_count;
|
||||||
|
END IF;
|
||||||
|
IF debug_ttl > 0 THEN
|
||||||
|
debug_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - debug_ttl::bigint * 1000)::bigint;
|
||||||
|
EXECUTE format(
|
||||||
|
'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type = %L::varchar OR event_type = %L::varchar) RETURNING *) SELECT count(*) FROM deleted', debug_ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into debug_ttl_deleted_count;
|
||||||
|
END IF;
|
||||||
|
RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count;
|
||||||
|
RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count;
|
||||||
|
deleted := ttl_deleted_count + debug_ttl_deleted_count;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|||||||
@ -23,8 +23,8 @@ import org.springframework.context.ApplicationContext;
|
|||||||
import org.springframework.context.annotation.Profile;
|
import org.springframework.context.annotation.Profile;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.service.component.ComponentDiscoveryService;
|
import org.thingsboard.server.service.component.ComponentDiscoveryService;
|
||||||
import org.thingsboard.server.service.install.DatabaseTsUpgradeService;
|
|
||||||
import org.thingsboard.server.service.install.DatabaseEntitiesUpgradeService;
|
import org.thingsboard.server.service.install.DatabaseEntitiesUpgradeService;
|
||||||
|
import org.thingsboard.server.service.install.DatabaseTsUpgradeService;
|
||||||
import org.thingsboard.server.service.install.EntityDatabaseSchemaService;
|
import org.thingsboard.server.service.install.EntityDatabaseSchemaService;
|
||||||
import org.thingsboard.server.service.install.SystemDataLoaderService;
|
import org.thingsboard.server.service.install.SystemDataLoaderService;
|
||||||
import org.thingsboard.server.service.install.TsDatabaseSchemaService;
|
import org.thingsboard.server.service.install.TsDatabaseSchemaService;
|
||||||
|
|||||||
@ -225,6 +225,11 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
|||||||
conn.createStatement().execute("ALTER TABLE tenant ADD COLUMN isolated_tb_core boolean DEFAULT (false), ADD COLUMN isolated_tb_rule_engine boolean DEFAULT (false)");
|
conn.createStatement().execute("ALTER TABLE tenant ADD COLUMN isolated_tb_core boolean DEFAULT (false), ADD COLUMN isolated_tb_rule_engine boolean DEFAULT (false)");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
long ts = System.currentTimeMillis();
|
||||||
|
conn.createStatement().execute("ALTER TABLE event ADD COLUMN ts bigint DEFAULT " + ts + ";"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
log.info("Schema updated.");
|
log.info("Schema updated.");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -17,47 +17,27 @@ package org.thingsboard.server.service.ttl;
|
|||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.thingsboard.server.dao.util.PsqlDao;
|
||||||
import org.thingsboard.server.dao.util.PsqlTsAnyDao;
|
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.SQLWarning;
|
import java.sql.SQLWarning;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
|
||||||
@PsqlTsAnyDao
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractTimeseriesCleanUpService {
|
@PsqlDao
|
||||||
|
public abstract class AbstractCleanUpService {
|
||||||
@Value("${sql.ttl.ts_key_value_ttl}")
|
|
||||||
protected long systemTtl;
|
|
||||||
|
|
||||||
@Value("${sql.ttl.enabled}")
|
|
||||||
private boolean ttlTaskExecutionEnabled;
|
|
||||||
|
|
||||||
@Value("${spring.datasource.url}")
|
@Value("${spring.datasource.url}")
|
||||||
private String dbUrl;
|
protected String dbUrl;
|
||||||
|
|
||||||
@Value("${spring.datasource.username}")
|
@Value("${spring.datasource.username}")
|
||||||
private String dbUserName;
|
protected String dbUserName;
|
||||||
|
|
||||||
@Value("${spring.datasource.password}")
|
@Value("${spring.datasource.password}")
|
||||||
private String dbPassword;
|
protected String dbPassword;
|
||||||
|
|
||||||
@Scheduled(initialDelayString = "${sql.ttl.execution_interval_ms}", fixedDelayString = "${sql.ttl.execution_interval_ms}")
|
|
||||||
public void cleanUp() {
|
|
||||||
if (ttlTaskExecutionEnabled) {
|
|
||||||
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
|
|
||||||
doCleanUp(conn);
|
|
||||||
} catch (SQLException e) {
|
|
||||||
log.error("SQLException occurred during TTL task execution ", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract void doCleanUp(Connection connection);
|
|
||||||
|
|
||||||
protected long executeQuery(Connection conn, String query) {
|
protected long executeQuery(Connection conn, String query) {
|
||||||
long removed = 0L;
|
long removed = 0L;
|
||||||
@ -74,7 +54,7 @@ public abstract class AbstractTimeseriesCleanUpService {
|
|||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getWarnings(Statement statement) throws SQLException {
|
protected void getWarnings(Statement statement) throws SQLException {
|
||||||
SQLWarning warnings = statement.getWarnings();
|
SQLWarning warnings = statement.getWarnings();
|
||||||
if (warnings != null) {
|
if (warnings != null) {
|
||||||
log.debug("{}", warnings.getMessage());
|
log.debug("{}", warnings.getMessage());
|
||||||
@ -86,4 +66,6 @@ public abstract class AbstractTimeseriesCleanUpService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract void doCleanUp(Connection connection);
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -0,0 +1,59 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2020 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.ttl.events;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.server.dao.util.PsqlDao;
|
||||||
|
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
@PsqlDao
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class EventsCleanUpService extends AbstractCleanUpService {
|
||||||
|
|
||||||
|
@Value("${sql.ttl.events.events_ttl}")
|
||||||
|
private long ttl;
|
||||||
|
|
||||||
|
@Value("${sql.ttl.events.debug_events_ttl}")
|
||||||
|
private long debugTtl;
|
||||||
|
|
||||||
|
@Value("${sql.ttl.events.enabled}")
|
||||||
|
private boolean ttlTaskExecutionEnabled;
|
||||||
|
|
||||||
|
@Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
|
||||||
|
public void cleanUp() {
|
||||||
|
if (ttlTaskExecutionEnabled) {
|
||||||
|
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
|
||||||
|
doCleanUp(conn);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("SQLException occurred during TTL task execution ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doCleanUp(Connection connection) {
|
||||||
|
long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);");
|
||||||
|
log.info("Total events removed by TTL: [{}]", totalEventsRemoved);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,49 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2020 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.ttl.timeseries;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.thingsboard.server.dao.util.PsqlTsAnyDao;
|
||||||
|
import org.thingsboard.server.service.ttl.AbstractCleanUpService;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
@PsqlTsAnyDao
|
||||||
|
@Slf4j
|
||||||
|
public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService {
|
||||||
|
|
||||||
|
@Value("${sql.ttl.ts.ts_key_value_ttl}")
|
||||||
|
protected long systemTtl;
|
||||||
|
|
||||||
|
@Value("${sql.ttl.ts.enabled}")
|
||||||
|
private boolean ttlTaskExecutionEnabled;
|
||||||
|
|
||||||
|
@Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}")
|
||||||
|
public void cleanUp() {
|
||||||
|
if (ttlTaskExecutionEnabled) {
|
||||||
|
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
|
||||||
|
doCleanUp(conn);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("SQLException occurred during TTL task execution ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -13,7 +13,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ttl;
|
package org.thingsboard.server.service.ttl.timeseries;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
@ -13,7 +13,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.ttl;
|
package org.thingsboard.server.service.ttl.timeseries;
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
@ -203,9 +203,15 @@ sql:
|
|||||||
# Specify Interval size for new data chunks storage.
|
# Specify Interval size for new data chunks storage.
|
||||||
chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
|
chunk_time_interval: "${SQL_TIMESCALE_CHUNK_TIME_INTERVAL:604800000}"
|
||||||
ttl:
|
ttl:
|
||||||
enabled: "${SQL_TTL_ENABLED:true}"
|
ts:
|
||||||
execution_interval_ms: "${SQL_TTL_EXECUTION_INTERVAL:86400000}" # Number of miliseconds
|
enabled: "${SQL_TTL_TS_ENABLED:true}"
|
||||||
ts_key_value_ttl: "${SQL_TTL_TS_KEY_VALUE_TTL:0}" # Number of seconds
|
execution_interval_ms: "${SQL_TTL_TS_EXECUTION_INTERVAL:86400000}" # Number of miliseconds. The current value corresponds to one day
|
||||||
|
ts_key_value_ttl: "${SQL_TTL_TS_TS_KEY_VALUE_TTL:0}" # Number of seconds
|
||||||
|
events:
|
||||||
|
enabled: "${SQL_TTL_EVENTS_ENABLED:true}"
|
||||||
|
execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of miliseconds. The current value corresponds to one day
|
||||||
|
events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds
|
||||||
|
debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week
|
||||||
|
|
||||||
# Actor system parameters
|
# Actor system parameters
|
||||||
actors:
|
actors:
|
||||||
|
|||||||
@ -32,6 +32,9 @@ public class ModelConstants {
|
|||||||
public static final String NULL_UUID_STR = UUIDConverter.fromTimeUUID(NULL_UUID);
|
public static final String NULL_UUID_STR = UUIDConverter.fromTimeUUID(NULL_UUID);
|
||||||
public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
|
public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
|
||||||
|
|
||||||
|
// this is the difference between midnight October 15, 1582 UTC and midnight January 1, 1970 UTC as 100 nanosecond units
|
||||||
|
public static final long EPOCH_DIFF = 122192928000000000L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generic constants.
|
* Generic constants.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -37,6 +37,9 @@ import javax.persistence.EnumType;
|
|||||||
import javax.persistence.Enumerated;
|
import javax.persistence.Enumerated;
|
||||||
import javax.persistence.Table;
|
import javax.persistence.Table;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.dao.model.ModelConstants.EPOCH_DIFF;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BODY_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BODY_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME;
|
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_ID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_ID_PROPERTY;
|
||||||
@ -44,6 +47,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EVENT_ENTITY_TYPE_
|
|||||||
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TENANT_ID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TENANT_ID_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_PROPERTY;
|
||||||
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_UID_PROPERTY;
|
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_UID_PROPERTY;
|
||||||
|
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
@ -73,9 +77,15 @@ public class EventEntity extends BaseSqlEntity<Event> implements BaseEntity<Eve
|
|||||||
@Column(name = EVENT_BODY_PROPERTY)
|
@Column(name = EVENT_BODY_PROPERTY)
|
||||||
private JsonNode body;
|
private JsonNode body;
|
||||||
|
|
||||||
|
@Column(name = TS_COLUMN)
|
||||||
|
private long ts;
|
||||||
|
|
||||||
public EventEntity(Event event) {
|
public EventEntity(Event event) {
|
||||||
if (event.getId() != null) {
|
if (event.getId() != null) {
|
||||||
this.setUuid(event.getId().getId());
|
this.setUuid(event.getId().getId());
|
||||||
|
this.ts = getTs(event.getId().getId());
|
||||||
|
} else {
|
||||||
|
this.ts = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
if (event.getTenantId() != null) {
|
if (event.getTenantId() != null) {
|
||||||
this.tenantId = toString(event.getTenantId().getId());
|
this.tenantId = toString(event.getTenantId().getId());
|
||||||
@ -101,4 +111,8 @@ public class EventEntity extends BaseSqlEntity<Event> implements BaseEntity<Eve
|
|||||||
event.setUid(eventUid);
|
event.setUid(eventUid);
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getTs(UUID uuid) {
|
||||||
|
return (uuid.timestamp() - EPOCH_DIFF) / 1000;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -75,7 +75,8 @@ public abstract class AbstractEventInsertRepository implements EventInsertReposi
|
|||||||
.setParameter("entity_type", entity.getEntityType().name())
|
.setParameter("entity_type", entity.getEntityType().name())
|
||||||
.setParameter("event_type", entity.getEventType())
|
.setParameter("event_type", entity.getEventType())
|
||||||
.setParameter("event_uid", entity.getEventUid())
|
.setParameter("event_uid", entity.getEventUid())
|
||||||
.setParameter("tenant_id", entity.getTenantId());
|
.setParameter("tenant_id", entity.getTenantId())
|
||||||
|
.setParameter("ts", entity.getTs());
|
||||||
}
|
}
|
||||||
|
|
||||||
private EventEntity processSaveOrUpdate(EventEntity entity, String query) {
|
private EventEntity processSaveOrUpdate(EventEntity entity, String query) {
|
||||||
|
|||||||
@ -44,7 +44,7 @@ public class HsqlEventInsertRepository extends AbstractEventInsertRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static String getInsertString(String conflictStatement) {
|
private static String getInsertString(String conflictStatement) {
|
||||||
return "MERGE INTO event USING (VALUES :id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id) I (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) ON " + conflictStatement + " WHEN MATCHED THEN UPDATE SET event.id = I.id, event.body = I.body, event.entity_id = I.entity_id, event.entity_type = I.entity_type, event.event_type = I.event_type, event.event_uid = I.event_uid, event.tenant_id = I.tenant_id" +
|
return "MERGE INTO event USING (VALUES :id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id, :ts) I (id, body, entity_id, entity_type, event_type, event_uid, tenant_id, ts) ON " + conflictStatement + " WHEN MATCHED THEN UPDATE SET event.id = I.id, event.body = I.body, event.entity_id = I.entity_id, event.entity_type = I.entity_type, event.event_type = I.event_type, event.event_uid = I.event_uid, event.tenant_id = I.tenant_id, event.ts = I.ts" +
|
||||||
" WHEN NOT MATCHED THEN INSERT (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) VALUES (I.id, I.body, I.entity_id, I.entity_type, I.event_type, I.event_uid, I.tenant_id)";
|
" WHEN NOT MATCHED THEN INSERT (id, body, entity_id, entity_type, event_type, event_uid, tenant_id, ts) VALUES (I.id, I.body, I.entity_id, I.entity_type, I.event_type, I.event_uid, I.tenant_id, I.ts)";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -48,6 +48,6 @@ public class PsqlEventInsertRepository extends AbstractEventInsertRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static String getInsertOrUpdateString(String eventKeyStatement, String updateKeyStatement) {
|
private static String getInsertOrUpdateString(String eventKeyStatement, String updateKeyStatement) {
|
||||||
return "INSERT INTO event (id, body, entity_id, entity_type, event_type, event_uid, tenant_id) VALUES (:id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id) ON CONFLICT " + eventKeyStatement + " DO UPDATE SET body = :body, " + updateKeyStatement + " returning *";
|
return "INSERT INTO event (id, body, entity_id, entity_type, event_type, event_uid, tenant_id, ts) VALUES (:id, :body, :entity_id, :entity_type, :event_type, :event_uid, :tenant_id, :ts) ON CONFLICT " + eventKeyStatement + " DO UPDATE SET body = :body, ts = :ts," + updateKeyStatement + " returning *";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -144,6 +144,7 @@ CREATE TABLE IF NOT EXISTS event (
|
|||||||
event_type varchar(255),
|
event_type varchar(255),
|
||||||
event_uid varchar(255),
|
event_uid varchar(255),
|
||||||
tenant_id varchar(31),
|
tenant_id varchar(31),
|
||||||
|
ts bigint NOT NULL,
|
||||||
CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid)
|
CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid)
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -251,3 +252,4 @@ CREATE TABLE IF NOT EXISTS entity_view (
|
|||||||
search_text varchar(255),
|
search_text varchar(255),
|
||||||
additional_info varchar
|
additional_info varchar
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -144,6 +144,7 @@ CREATE TABLE IF NOT EXISTS event (
|
|||||||
event_type varchar(255),
|
event_type varchar(255),
|
||||||
event_uid varchar(255),
|
event_uid varchar(255),
|
||||||
tenant_id varchar(31),
|
tenant_id varchar(31),
|
||||||
|
ts bigint NOT NULL,
|
||||||
CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid)
|
CONSTRAINT event_unq_key UNIQUE (tenant_id, entity_type, entity_id, event_type, event_uid)
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -251,3 +252,28 @@ CREATE TABLE IF NOT EXISTS entity_view (
|
|||||||
search_text varchar(255),
|
search_text varchar(255),
|
||||||
additional_info varchar
|
additional_info varchar
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE OR REPLACE PROCEDURE cleanup_events_by_ttl(IN ttl bigint, IN debug_ttl bigint, INOUT deleted bigint)
|
||||||
|
LANGUAGE plpgsql AS
|
||||||
|
$$
|
||||||
|
DECLARE
|
||||||
|
ttl_ts bigint;
|
||||||
|
debug_ttl_ts bigint;
|
||||||
|
ttl_deleted_count bigint DEFAULT 0;
|
||||||
|
debug_ttl_deleted_count bigint DEFAULT 0;
|
||||||
|
BEGIN
|
||||||
|
IF ttl > 0 THEN
|
||||||
|
ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - ttl::bigint * 1000)::bigint;
|
||||||
|
EXECUTE format(
|
||||||
|
'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type != %L::varchar AND event_type != %L::varchar) RETURNING *) SELECT count(*) FROM deleted', ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into ttl_deleted_count;
|
||||||
|
END IF;
|
||||||
|
IF debug_ttl > 0 THEN
|
||||||
|
debug_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - debug_ttl::bigint * 1000)::bigint;
|
||||||
|
EXECUTE format(
|
||||||
|
'WITH deleted AS (DELETE FROM event WHERE ts < %L::bigint AND (event_type = %L::varchar OR event_type = %L::varchar) RETURNING *) SELECT count(*) FROM deleted', debug_ttl_ts, 'DEBUG_RULE_NODE', 'DEBUG_RULE_CHAIN') into debug_ttl_deleted_count;
|
||||||
|
END IF;
|
||||||
|
RAISE NOTICE 'Events removed by ttl: %', ttl_deleted_count;
|
||||||
|
RAISE NOTICE 'Debug Events removed by ttl: %', debug_ttl_deleted_count;
|
||||||
|
deleted := ttl_deleted_count + debug_ttl_deleted_count;
|
||||||
|
END
|
||||||
|
$$;
|
||||||
|
|||||||
@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings
|
|||||||
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
|
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
|
||||||
);
|
);
|
||||||
|
|
||||||
INSERT INTO tb_schema_settings (schema_version) VALUES (2005000);
|
INSERT INTO tb_schema_settings (schema_version) VALUES (2005000) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005000;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
|
CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
|
||||||
$$
|
$$
|
||||||
|
|||||||
@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS tb_schema_settings
|
|||||||
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
|
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
|
||||||
);
|
);
|
||||||
|
|
||||||
INSERT INTO tb_schema_settings (schema_version) VALUES (2005000);
|
INSERT INTO tb_schema_settings (schema_version) VALUES (2005000) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005000;
|
||||||
|
|
||||||
CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint)
|
CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint)
|
||||||
LANGUAGE plpgsql AS
|
LANGUAGE plpgsql AS
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user