Refactoring of the ttl cleanup services

This commit is contained in:
Andrii Shvaika 2021-06-03 19:32:25 +03:00
parent 04e218166c
commit 083dc38cf8
24 changed files with 219 additions and 165 deletions

View File

@ -15,8 +15,13 @@
*/ */
package org.thingsboard.server.service.ttl; package org.thingsboard.server.service.ttl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
@ -27,43 +32,12 @@ import java.sql.Statement;
@Slf4j @Slf4j
@RequiredArgsConstructor
public abstract class AbstractCleanUpService { public abstract class AbstractCleanUpService {
@Value("${spring.datasource.url}") private final PartitionService partitionService;
protected String dbUrl;
@Value("${spring.datasource.username}") protected boolean isSystemTenantPartitionMine(){
protected String dbUserName; return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
@Value("${spring.datasource.password}")
protected String dbPassword;
protected long executeQuery(Connection conn, String query) throws SQLException {
try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) {
if (log.isDebugEnabled()) {
getWarnings(statement);
}
resultSet.next();
return resultSet.getLong(1);
}
} }
protected void getWarnings(Statement statement) throws SQLException {
SQLWarning warnings = statement.getWarnings();
if (warnings != null) {
log.debug("{}", warnings.getMessage());
SQLWarning nextWarning = warnings.getNextWarning();
while (nextWarning != null) {
log.debug("{}", nextWarning.getMessage());
nextWarning = nextWarning.getNextWarning();
}
}
}
protected abstract void doCleanUp(Connection connection) throws SQLException;
protected Connection getConnection() throws SQLException {
return DriverManager.getConnection(dbUrl, dbUserName, dbPassword);
}
} }

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.ttl.alarms; package org.thingsboard.server.service.ttl;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class AlarmsCleanUpService { public class AlarmsCleanUpService {
@Value("${sql.ttl.alarms.removal_batch_size}") @Value("${sql.ttl.alarms.removal_batch_size}")
private Integer removalBatchSize; private Integer removalBatchSize;

View File

@ -13,20 +13,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.ttl.edge; package org.thingsboard.server.service.ttl;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ttl.AbstractCleanUpService; import org.thingsboard.server.service.ttl.AbstractCleanUpService;
import java.sql.Connection; @TbCoreComponent
import java.sql.DriverManager;
import java.sql.SQLException;
@PsqlDao
@Slf4j @Slf4j
@Service @Service
public class EdgeEventsCleanUpService extends AbstractCleanUpService { public class EdgeEventsCleanUpService extends AbstractCleanUpService {
@ -37,20 +35,20 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
@Value("${sql.ttl.edge_events.enabled}") @Value("${sql.ttl.edge_events.enabled}")
private boolean ttlTaskExecutionEnabled; private boolean ttlTaskExecutionEnabled;
private final EdgeService edgeService;
public EdgeEventsCleanUpService(PartitionService partitionService, EdgeService edgeService) {
super(partitionService);
this.edgeService = edgeService;
}
@Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
public void cleanUp() { public void cleanUp() {
if (ttlTaskExecutionEnabled) { if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
try (Connection conn = getConnection()) { log.info("Going to cleanup old edge events using ttl: {}s", ttl);
doCleanUp(conn); long totalEdgeEventsRemoved = edgeService.cleanupEvents(ttl);
} catch (SQLException e) { log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved);
log.error("SQLException occurred during TTL task execution ", e);
}
} }
} }
@Override
protected void doCleanUp(Connection connection) throws SQLException {
long totalEdgeEventsRemoved = executeQuery(connection, "call cleanup_edge_events_by_ttl(" + ttl + ", 0);");
log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved);
}
} }

View File

@ -13,20 +13,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.ttl.events; package org.thingsboard.server.service.ttl;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.ttl.AbstractCleanUpService; import org.thingsboard.server.service.ttl.AbstractCleanUpService;
import java.sql.Connection; @TbCoreComponent
import java.sql.DriverManager;
import java.sql.SQLException;
@PsqlDao
@Slf4j @Slf4j
@Service @Service
public class EventsCleanUpService extends AbstractCleanUpService { public class EventsCleanUpService extends AbstractCleanUpService {
@ -40,20 +38,20 @@ public class EventsCleanUpService extends AbstractCleanUpService {
@Value("${sql.ttl.events.enabled}") @Value("${sql.ttl.events.enabled}")
private boolean ttlTaskExecutionEnabled; private boolean ttlTaskExecutionEnabled;
private final EventService eventService;
public EventsCleanUpService(PartitionService partitionService, EventService eventService) {
super(partitionService);
this.eventService = eventService;
}
@Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}") @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}")
public void cleanUp() { public void cleanUp() {
if (ttlTaskExecutionEnabled) { if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
try (Connection conn = getConnection()) { log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugTtl, ttl);
doCleanUp(conn); long totalEventsRemoved = eventService.cleanupEvents(ttl, debugTtl);
} catch (SQLException e) { log.info("Total events removed by TTL: [{}]", totalEventsRemoved);
log.error("SQLException occurred during TTL task execution ", e);
}
} }
} }
@Override
protected void doCleanUp(Connection connection) throws SQLException {
long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);");
log.info("Total events removed by TTL: [{}]", totalEventsRemoved);
}
} }

View File

@ -13,19 +13,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.ttl.timeseries; package org.thingsboard.server.service.ttl;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.service.ttl.AbstractCleanUpService; import org.thingsboard.server.service.ttl.AbstractCleanUpService;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@Slf4j @Slf4j
public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService { public class TimeseriesCleanUpService extends AbstractCleanUpService {
@Value("${sql.ttl.ts.ts_key_value_ttl}") @Value("${sql.ttl.ts.ts_key_value_ttl}")
protected long systemTtl; protected long systemTtl;
@ -33,14 +31,17 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe
@Value("${sql.ttl.ts.enabled}") @Value("${sql.ttl.ts.enabled}")
private boolean ttlTaskExecutionEnabled; private boolean ttlTaskExecutionEnabled;
private final TimeseriesService timeseriesService;
public TimeseriesCleanUpService(PartitionService partitionService, TimeseriesService timeseriesService) {
super(partitionService);
this.timeseriesService = timeseriesService;
}
@Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}") @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}")
public void cleanUp() { public void cleanUp() {
if (ttlTaskExecutionEnabled) { if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
try (Connection conn = getConnection()) { timeseriesService.cleanup(systemTtl);
doCleanUp(conn);
} catch (SQLException e) {
log.error("SQLException occurred during TTL task execution ", e);
}
} }
} }

View File

@ -1,44 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
import java.sql.Connection;
import java.sql.SQLException;
@SqlTsDao
@PsqlDao
@Service
@Slf4j
public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {
@Value("${sql.postgres.ts_key_value_partitioning}")
private String partitionType;
@Override
protected void doCleanUp(Connection connection) throws SQLException {
long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);");
log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved);
long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");
log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
}
}

View File

@ -1,36 +0,0 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ttl.timeseries;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import java.sql.Connection;
import java.sql.SQLException;
@TimescaleDBTsDao
@Service
@Slf4j
public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService {
@Override
protected void doCleanUp(Connection connection) throws SQLException {
long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);");
log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved);
}
}

View File

@ -93,4 +93,6 @@ public interface EdgeService {
Object activateInstance(String licenseSecret, String releaseDate); Object activateInstance(String licenseSecret, String releaseDate);
String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId);
long cleanupEvents(long ttl);
} }

View File

@ -46,4 +46,6 @@ public interface EventService {
void removeEvents(TenantId tenantId, EntityId entityId); void removeEvents(TenantId tenantId, EntityId entityId);
long cleanupEvents(long ttl, long debugTtl);
} }

View File

@ -52,4 +52,6 @@ public interface TimeseriesService {
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds); List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);
void cleanup(long systemTtl);
} }

View File

@ -176,4 +176,11 @@ public interface EdgeDao extends Dao<Edge> {
* @return the list of rule chain objects * @return the list of rule chain objects
*/ */
ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId); ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId);
/**
* Executes stored procedure to cleanup old edge events.
* @param ttl the ttl for edge events in seconds
* @return the number of deleted edge events
*/
long cleanupEvents(long ttl);
} }

View File

@ -627,6 +627,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
return result.toString(); return result.toString();
} }
@Override
public long cleanupEvents(long ttl) {
return edgeDao.cleanupEvents(ttl);
}
private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
List<RuleChain> result = new ArrayList<>(); List<RuleChain> result = new ArrayList<>();
PageLink pageLink = new PageLink(DEFAULT_LIMIT); PageLink pageLink = new PageLink(DEFAULT_LIMIT);

View File

@ -131,6 +131,11 @@ public class BaseEventService implements EventService {
} while (eventPageData.hasNext()); } while (eventPageData.hasNext());
} }
@Override
public long cleanupEvents(long ttl, long debugTtl) {
return eventDao.cleanupEvents(ttl, debugTtl);
}
private DataValidator<Event> eventValidator = private DataValidator<Event> eventValidator =
new DataValidator<Event>() { new DataValidator<Event>() {
@Override @Override

View File

@ -102,4 +102,11 @@ public interface EventDao extends Dao<Event> {
*/ */
List<Event> findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit); List<Event> findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit);
/**
* Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events.
* @param otherEventsTtl the ttl for events in seconds
* @param debugEventsTtl the ttl for debug events in seconds
* @return the number of deleted events
*/
long cleanupEvents(long otherEventsTtl, long debugEventsTtl);
} }

View File

@ -15,11 +15,33 @@
*/ */
package org.thingsboard.server.dao.sql; package org.thingsboard.server.dao.sql;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
@Slf4j
public abstract class JpaAbstractDaoListeningExecutorService { public abstract class JpaAbstractDaoListeningExecutorService {
@Autowired @Autowired
protected JpaExecutorService service; protected JpaExecutorService service;
@Autowired
protected DataSource dataSource;
protected void printWarnings(Statement statement) throws SQLException {
SQLWarning warnings = statement.getWarnings();
if (warnings != null) {
log.debug("{}", warnings.getMessage());
SQLWarning nextWarning = warnings.getNextWarning();
while (nextWarning != null) {
log.debug("{}", nextWarning.getMessage());
nextWarning = nextWarning.getNextWarning();
}
}
}
} }

View File

@ -40,6 +40,9 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity;
import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.relation.RelationDao;
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -194,6 +197,21 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao<EdgeEntity, Edge> imple
return transformFromRelationToEdge(tenantId, relations); return transformFromRelationToEdge(tenantId, relations);
} }
@Override
public long cleanupEvents(long ttl) {
try {
CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_edge_events_by_ttl(?,?,?)}");
stmt.setLong(1, ttl);
stmt.registerOutParameter(3, Types.BIGINT);
stmt.executeUpdate();
printWarnings(stmt);
return stmt.getLong(3);
} catch (SQLException e) {
log.error("SQLException occurred during TTL task execution ", e);
return 0;
}
}
private ListenableFuture<List<Edge>> transformFromRelationToEdge(UUID tenantId, ListenableFuture<List<EntityRelation>> relations) { private ListenableFuture<List<Edge>> transformFromRelationToEdge(UUID tenantId, ListenableFuture<List<EntityRelation>> relations) {
return Futures.transformAsync(relations, input -> { return Futures.transformAsync(relations, input -> {
List<ListenableFuture<Edge>> edgeFutures = new ArrayList<>(input.size()); List<ListenableFuture<Edge>> edgeFutures = new ArrayList<>(input.size());

View File

@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.event.DebugEvent; import org.thingsboard.server.common.data.event.DebugEvent;
import org.thingsboard.server.common.data.event.ErrorEventFilter; import org.thingsboard.server.common.data.event.ErrorEventFilter;
import org.thingsboard.server.common.data.event.EventFilter; import org.thingsboard.server.common.data.event.EventFilter;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.event.LifeCycleEventFilter; import org.thingsboard.server.common.data.event.LifeCycleEventFilter;
import org.thingsboard.server.common.data.event.StatisticsEventFilter; import org.thingsboard.server.common.data.event.StatisticsEventFilter;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
@ -40,6 +39,12 @@ import org.thingsboard.server.dao.event.EventDao;
import org.thingsboard.server.dao.model.sql.EventEntity; import org.thingsboard.server.dao.model.sql.EventEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDao; import org.thingsboard.server.dao.sql.JpaAbstractDao;
import javax.sql.DataSource;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.sql.Types;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -256,6 +261,22 @@ public class JpaBaseEventDao extends JpaAbstractDao<EventEntity, Event> implemen
return DaoUtil.convertDataList(latest); return DaoUtil.convertDataList(latest);
} }
@Override
public long cleanupEvents(long otherEventsTtl, long debugEventsTtl) {
try {
CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_events_by_ttl(?,?,?)}");
stmt.setLong(1, otherEventsTtl);
stmt.setLong(2, debugEventsTtl);
stmt.registerOutParameter(3, Types.BIGINT);
stmt.executeUpdate();
printWarnings(stmt);
return stmt.getLong(3);
} catch (SQLException e) {
log.error("SQLException occurred during TTL task execution ", e);
return 0;
}
}
public Optional<Event> save(EventEntity entity, boolean ifNotExists) { public Optional<Event> save(EventEntity entity, boolean ifNotExists) {
log.debug("Save event [{}] ", entity); log.debug("Save event [{}] ", entity);
if (entity.getTenantId() == null) { if (entity.getTenantId() == null) {

View File

@ -25,9 +25,13 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -62,6 +66,21 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
@Value("${sql.ttl.ts.ts_key_value_ttl:0}") @Value("${sql.ttl.ts.ts_key_value_ttl:0}")
private long systemTtl; private long systemTtl;
public void cleanup(long systemTtl) {
try {
log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl);
CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_timeseries_by_ttl(?,?,?)}");
stmt.setObject(1, ModelConstants.NULL_UUID);
stmt.setLong(2, systemTtl);
stmt.registerOutParameter(3, Types.BIGINT);
stmt.executeUpdate();
printWarnings(stmt);
log.info("Total telemetry removed stats by TTL for entities: [{}]", stmt.getLong(3));
} catch (SQLException e) {
log.error("SQLException occurred during TTL task execution ", e);
}
}
protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) { protected ListenableFuture<List<TsKvEntry>> processFindAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
List<ListenableFuture<List<TsKvEntry>>> futures = queries List<ListenableFuture<List<TsKvEntry>>> futures = queries
.stream() .stream()

View File

@ -54,4 +54,9 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
} }
@Override
public void cleanup(long systemTtl) {
}
} }

View File

@ -27,6 +27,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao;
import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository; import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository;
@ -35,6 +36,9 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate;
import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao; import org.thingsboard.server.dao.util.SqlTsDao;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -62,6 +66,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
@Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}")
private String partitioning; private String partitioning;
@Override @Override
protected void init() { protected void init() {
super.init(); super.init();
@ -93,6 +98,27 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor());
} }
@Override
public void cleanup(long systemTtl) {
cleanupPartitions(systemTtl);
super.cleanup(systemTtl);
}
private void cleanupPartitions(long systemTtl) {
try {
log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl);
CallableStatement stmt = dataSource.getConnection().prepareCall("{call drop_partitions_by_max_ttl(?,?,?)}");
stmt.setObject(1, partitioning);
stmt.setLong(2, systemTtl);
stmt.registerOutParameter(3, Types.BIGINT);
stmt.executeUpdate();
printWarnings(stmt);
log.info("Total partitions removed by TTL: [{}]", stmt.getLong(3));
} catch (SQLException e) {
log.error("SQLException occurred during TTL task execution ", e);
}
}
private void savePartitionIfNotExist(long ts) { private void savePartitionIfNotExist(long ts) {
if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) { if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) {
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
@ -45,6 +46,9 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Function; import java.util.function.Function;
@ -156,6 +160,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
} }
} }
@Override
public void cleanup(long systemTtl) {
super.cleanup(systemTtl);
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) {
String strKey = query.getKey(); String strKey = query.getKey();
Integer keyId = getOrSaveKeyId(strKey); Integer keyId = getOrSaveKeyId(strKey);

View File

@ -126,6 +126,11 @@ public class BaseTimeseriesService implements TimeseriesService {
return timeseriesLatestDao.findAllKeysByEntityIds(tenantId, entityIds); return timeseriesLatestDao.findAllKeysByEntityIds(tenantId, entityIds);
} }
@Override
public void cleanup(long systemTtl) {
timeseriesDao.cleanup(systemTtl);
}
@Override @Override
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
validate(entityId); validate(entityId);

View File

@ -288,6 +288,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
} }
} }
@Override
public void cleanup(long systemTtl) {
//Cleanup by TTL is native for Cassandra
}
private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { private ListenableFuture<List<TsKvEntry>> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs()); long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs()); long maxPartition = toPartitionTs(query.getEndTs());

View File

@ -38,4 +38,6 @@ public interface TimeseriesDao {
ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); ListenableFuture<Void> remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); ListenableFuture<Void> removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
void cleanup(long systemTtl);
} }