diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index 05799fc643..d0ed543b76 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -15,8 +15,13 @@ */ package org.thingsboard.server.service.ttl; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.queue.discovery.PartitionService; import java.sql.Connection; import java.sql.DriverManager; @@ -27,43 +32,12 @@ import java.sql.Statement; @Slf4j +@RequiredArgsConstructor public abstract class AbstractCleanUpService { - @Value("${spring.datasource.url}") - protected String dbUrl; + private final PartitionService partitionService; - @Value("${spring.datasource.username}") - protected String dbUserName; - - @Value("${spring.datasource.password}") - protected String dbPassword; - - protected long executeQuery(Connection conn, String query) throws SQLException { - try (Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery(query)) { - if (log.isDebugEnabled()) { - getWarnings(statement); - } - resultSet.next(); - return resultSet.getLong(1); - } + protected boolean isSystemTenantPartitionMine(){ + return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition(); } - - protected void getWarnings(Statement statement) throws SQLException { - SQLWarning warnings = statement.getWarnings(); - if (warnings != null) { - log.debug("{}", warnings.getMessage()); - SQLWarning nextWarning = warnings.getNextWarning(); - while (nextWarning != null) { - log.debug("{}", nextWarning.getMessage()); - nextWarning = nextWarning.getNextWarning(); - } - } - } - - protected abstract void doCleanUp(Connection connection) throws SQLException; - - protected Connection getConnection() throws SQLException { - return DriverManager.getConnection(dbUrl, dbUserName, dbPassword); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java similarity index 99% rename from application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java index 3b76a6cbca..051a6c92b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/alarms/AlarmsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AlarmsCleanUpService.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.alarms; +package org.thingsboard.server.service.ttl; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; @Slf4j @RequiredArgsConstructor public class AlarmsCleanUpService { + @Value("${sql.ttl.alarms.removal_batch_size}") private Integer removalBatchSize; diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java similarity index 63% rename from application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java index e93a82c7eb..7aa8881bed 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/edge/EdgeEventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java @@ -13,20 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.edge; +package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.util.PsqlDao; +import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -@PsqlDao +@TbCoreComponent @Slf4j @Service public class EdgeEventsCleanUpService extends AbstractCleanUpService { @@ -37,20 +35,20 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.enabled}") private boolean ttlTaskExecutionEnabled; + private final EdgeService edgeService; + + public EdgeEventsCleanUpService(PartitionService partitionService, EdgeService edgeService) { + super(partitionService); + this.edgeService = edgeService; + } + @Scheduled(initialDelayString = "${sql.ttl.edge_events.execution_interval_ms}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - try (Connection conn = getConnection()) { - doCleanUp(conn); - } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - } + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { + log.info("Going to cleanup old edge events using ttl: {}s", ttl); + long totalEdgeEventsRemoved = edgeService.cleanupEvents(ttl); + log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved); } } - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalEdgeEventsRemoved = executeQuery(connection, "call cleanup_edge_events_by_ttl(" + ttl + ", 0);"); - log.info("Total edge events removed by TTL: [{}]", totalEdgeEventsRemoved); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java similarity index 63% rename from application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java index 407c88261f..b244e1b8fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/events/EventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EventsCleanUpService.java @@ -13,20 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.events; +package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.util.PsqlDao; +import org.thingsboard.server.dao.event.EventService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -@PsqlDao +@TbCoreComponent @Slf4j @Service public class EventsCleanUpService extends AbstractCleanUpService { @@ -40,20 +38,20 @@ public class EventsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.events.enabled}") private boolean ttlTaskExecutionEnabled; + private final EventService eventService; + + public EventsCleanUpService(PartitionService partitionService, EventService eventService) { + super(partitionService); + this.eventService = eventService; + } + @Scheduled(initialDelayString = "${sql.ttl.events.execution_interval_ms}", fixedDelayString = "${sql.ttl.events.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - try (Connection conn = getConnection()) { - doCleanUp(conn); - } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - } + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { + log.info("Going to cleanup old events using debug events ttl: {}s and other events ttl: {}s", debugTtl, ttl); + long totalEventsRemoved = eventService.cleanupEvents(ttl, debugTtl); + log.info("Total events removed by TTL: [{}]", totalEventsRemoved); } } - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalEventsRemoved = executeQuery(connection, "call cleanup_events_by_ttl(" + ttl + ", " + debugTtl + ", 0);"); - log.info("Total events removed by TTL: [{}]", totalEventsRemoved); - } } \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java similarity index 65% rename from application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java rename to application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java index ee2d437a22..7094b34e62 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/AbstractTimeseriesCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/TimeseriesCleanUpService.java @@ -13,19 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.ttl.timeseries; +package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.service.ttl.AbstractCleanUpService; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - @Slf4j -public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpService { +public class TimeseriesCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.ts.ts_key_value_ttl}") protected long systemTtl; @@ -33,14 +31,17 @@ public abstract class AbstractTimeseriesCleanUpService extends AbstractCleanUpSe @Value("${sql.ttl.ts.enabled}") private boolean ttlTaskExecutionEnabled; + private final TimeseriesService timeseriesService; + + public TimeseriesCleanUpService(PartitionService partitionService, TimeseriesService timeseriesService) { + super(partitionService); + this.timeseriesService = timeseriesService; + } + @Scheduled(initialDelayString = "${sql.ttl.ts.execution_interval_ms}", fixedDelayString = "${sql.ttl.ts.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled) { - try (Connection conn = getConnection()) { - doCleanUp(conn); - } catch (SQLException e) { - log.error("SQLException occurred during TTL task execution ", e); - } + if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { + timeseriesService.cleanup(systemTtl); } } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java deleted file mode 100644 index 3197f0cbb0..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/PsqlTimeseriesCleanUpService.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright © 2016-2021 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.ttl.timeseries; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.dao.util.PsqlDao; -import org.thingsboard.server.dao.util.SqlTsDao; - -import java.sql.Connection; -import java.sql.SQLException; - -@SqlTsDao -@PsqlDao -@Service -@Slf4j -public class PsqlTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { - - @Value("${sql.postgres.ts_key_value_partitioning}") - private String partitionType; - - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalPartitionsRemoved = executeQuery(connection, "call drop_partitions_by_max_ttl('" + partitionType + "'," + systemTtl + ", 0);"); - log.info("Total partitions removed by TTL: [{}]", totalPartitionsRemoved); - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);"); - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); - } -} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java deleted file mode 100644 index 0ed61ef97c..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/ttl/timeseries/TimescaleTimeseriesCleanUpService.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright © 2016-2021 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.ttl.timeseries; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.dao.util.TimescaleDBTsDao; - -import java.sql.Connection; -import java.sql.SQLException; - -@TimescaleDBTsDao -@Service -@Slf4j -public class TimescaleTimeseriesCleanUpService extends AbstractTimeseriesCleanUpService { - - @Override - protected void doCleanUp(Connection connection) throws SQLException { - long totalEntitiesTelemetryRemoved = executeQuery(connection, "call cleanup_timeseries_by_ttl('" + ModelConstants.NULL_UUID + "'," + systemTtl + ", 0);"); - log.info("Total telemetry removed stats by TTL for entities: [{}]", totalEntitiesTelemetryRemoved); - } -} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index a7b9145c01..8567077eb1 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -93,4 +93,6 @@ public interface EdgeService { Object activateInstance(String licenseSecret, String releaseDate); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId); + + long cleanupEvents(long ttl); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java index ea25568375..dce49afdb1 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/event/EventService.java @@ -46,4 +46,6 @@ public interface EventService { void removeEvents(TenantId tenantId, EntityId entityId); + long cleanupEvents(long ttl, long debugTtl); + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 6fcb5ca2ca..b1a2541fc7 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -52,4 +52,6 @@ public interface TimeseriesService { List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); List findAllKeysByEntityIds(TenantId tenantId, List entityIds); + + void cleanup(long systemTtl); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java index d254c5ee9a..eda1075cd6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeDao.java @@ -176,4 +176,11 @@ public interface EdgeDao extends Dao { * @return the list of rule chain objects */ ListenableFuture> findEdgesByTenantIdAndDashboardId(UUID tenantId, UUID dashboardId); + + /** + * Executes stored procedure to cleanup old edge events. + * @param ttl the ttl for edge events in seconds + * @return the number of deleted edge events + */ + long cleanupEvents(long ttl); } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 2ff7d94f43..d06ce8d722 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -627,6 +627,11 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic return result.toString(); } + @Override + public long cleanupEvents(long ttl) { + return edgeDao.cleanupEvents(ttl); + } + private List findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { List result = new ArrayList<>(); PageLink pageLink = new PageLink(DEFAULT_LIMIT); diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java index 91bfa954fd..4cfd0a3ffa 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java @@ -131,6 +131,11 @@ public class BaseEventService implements EventService { } while (eventPageData.hasNext()); } + @Override + public long cleanupEvents(long ttl, long debugTtl) { + return eventDao.cleanupEvents(ttl, debugTtl); + } + private DataValidator eventValidator = new DataValidator() { @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java index ba4e86c95e..d943937ca3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java @@ -102,4 +102,11 @@ public interface EventDao extends Dao { */ List findLatestEvents(UUID tenantId, EntityId entityId, String eventType, int limit); + /** + * Executes stored procedure to cleanup old events. Uses separate ttl for debug and other events. + * @param otherEventsTtl the ttl for events in seconds + * @param debugEventsTtl the ttl for debug events in seconds + * @return the number of deleted events + */ + long cleanupEvents(long otherEventsTtl, long debugEventsTtl); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index 4431356690..fcd679383f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -15,11 +15,33 @@ */ package org.thingsboard.server.dao.sql; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import javax.sql.DataSource; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; + +@Slf4j public abstract class JpaAbstractDaoListeningExecutorService { @Autowired protected JpaExecutorService service; + @Autowired + protected DataSource dataSource; + + protected void printWarnings(Statement statement) throws SQLException { + SQLWarning warnings = statement.getWarnings(); + if (warnings != null) { + log.debug("{}", warnings.getMessage()); + SQLWarning nextWarning = warnings.getNextWarning(); + while (nextWarning != null) { + log.debug("{}", nextWarning.getMessage()); + nextWarning = nextWarning.getNextWarning(); + } + } + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java index f17196fe93..8e7112e81d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaEdgeDao.java @@ -40,6 +40,9 @@ import org.thingsboard.server.dao.model.sql.EdgeInfoEntity; import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -194,6 +197,21 @@ public class JpaEdgeDao extends JpaAbstractSearchTextDao imple return transformFromRelationToEdge(tenantId, relations); } + @Override + public long cleanupEvents(long ttl) { + try { + CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_edge_events_by_ttl(?,?,?)}"); + stmt.setLong(1, ttl); + stmt.registerOutParameter(3, Types.BIGINT); + stmt.executeUpdate(); + printWarnings(stmt); + return stmt.getLong(3); + } catch (SQLException e) { + log.error("SQLException occurred during TTL task execution ", e); + return 0; + } + } + private ListenableFuture> transformFromRelationToEdge(UUID tenantId, ListenableFuture> relations) { return Futures.transformAsync(relations, input -> { List> edgeFutures = new ArrayList<>(input.size()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index 44848ec515..edaf95a29d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.event.DebugEvent; import org.thingsboard.server.common.data.event.ErrorEventFilter; import org.thingsboard.server.common.data.event.EventFilter; -import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.common.data.event.LifeCycleEventFilter; import org.thingsboard.server.common.data.event.StatisticsEventFilter; import org.thingsboard.server.common.data.id.EntityId; @@ -40,6 +39,12 @@ import org.thingsboard.server.dao.event.EventDao; import org.thingsboard.server.dao.model.sql.EventEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; +import javax.sql.DataSource; +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.sql.Types; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -256,6 +261,22 @@ public class JpaBaseEventDao extends JpaAbstractDao implemen return DaoUtil.convertDataList(latest); } + @Override + public long cleanupEvents(long otherEventsTtl, long debugEventsTtl) { + try { + CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_events_by_ttl(?,?,?)}"); + stmt.setLong(1, otherEventsTtl); + stmt.setLong(2, debugEventsTtl); + stmt.registerOutParameter(3, Types.BIGINT); + stmt.executeUpdate(); + printWarnings(stmt); + return stmt.getLong(3); + } catch (SQLException e) { + log.error("SQLException occurred during TTL task execution ", e); + return 0; + } + } + public Optional save(EventEntity entity, boolean ifNotExists) { log.debug("Save event [{}] ", entity); if (entity.getTenantId() == null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index f6a6b56be5..abd8f15e77 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -25,9 +25,13 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import javax.annotation.Nullable; +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.Types; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -62,6 +66,21 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries @Value("${sql.ttl.ts.ts_key_value_ttl:0}") private long systemTtl; + public void cleanup(long systemTtl) { + try { + log.info("Going to cleanup old timeseries data using ttl: {}s", systemTtl); + CallableStatement stmt = dataSource.getConnection().prepareCall("{call cleanup_timeseries_by_ttl(?,?,?)}"); + stmt.setObject(1, ModelConstants.NULL_UUID); + stmt.setLong(2, systemTtl); + stmt.registerOutParameter(3, Types.BIGINT); + stmt.executeUpdate(); + printWarnings(stmt); + log.info("Total telemetry removed stats by TTL for entities: [{}]", stmt.getLong(3)); + } catch (SQLException e) { + log.error("SQLException occurred during TTL task execution ", e); + } + } + protected ListenableFuture> processFindAllAsync(TenantId tenantId, EntityId entityId, List queries) { List>> futures = queries .stream() diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java index c01d91ec61..c8241714f4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/hsql/JpaHsqlTimeseriesDao.java @@ -54,4 +54,9 @@ public class JpaHsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); } + @Override + public void cleanup(long systemTtl) { + + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java index 64c074fd40..594f77ae78 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/psql/JpaPsqlTimeseriesDao.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.sqlts.AbstractChunkedAggregationTimeseriesDao; import org.thingsboard.server.dao.sqlts.insert.psql.PsqlPartitioningRepository; @@ -35,6 +36,9 @@ import org.thingsboard.server.dao.timeseries.SqlTsPartitionDate; import org.thingsboard.server.dao.util.PsqlDao; import org.thingsboard.server.dao.util.SqlTsDao; +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.Types; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -62,6 +66,7 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa @Value("${sql.postgres.ts_key_value_partitioning:MONTHS}") private String partitioning; + @Override protected void init() { super.init(); @@ -93,6 +98,27 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa return Futures.transform(tsQueue.add(entity), v -> dataPointDays, MoreExecutors.directExecutor()); } + @Override + public void cleanup(long systemTtl) { + cleanupPartitions(systemTtl); + super.cleanup(systemTtl); + } + + private void cleanupPartitions(long systemTtl) { + try { + log.info("Going to cleanup old timeseries data partitions using partition type: {} and ttl: {}s", partitioning, systemTtl); + CallableStatement stmt = dataSource.getConnection().prepareCall("{call drop_partitions_by_max_ttl(?,?,?)}"); + stmt.setObject(1, partitioning); + stmt.setLong(2, systemTtl); + stmt.registerOutParameter(3, Types.BIGINT); + stmt.executeUpdate(); + printWarnings(stmt); + log.info("Total partitions removed by TTL: [{}]", stmt.getLong(3)); + } catch (SQLException e) { + log.error("SQLException occurred during TTL task execution ", e); + } + } + private void savePartitionIfNotExist(long ts) { if (!tsFormat.equals(SqlTsPartitionDate.INDEFINITE) && ts >= 0) { LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 7f798ddb4b..31f3407fbf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; import org.thingsboard.server.dao.model.sqlts.timescale.ts.TimescaleTsKvEntity; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; @@ -45,6 +46,9 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.sql.CallableStatement; +import java.sql.SQLException; +import java.sql.Types; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -156,6 +160,11 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } } + @Override + public void cleanup(long systemTtl) { + super.cleanup(systemTtl); + } + private ListenableFuture> findAllAsyncWithLimit(EntityId entityId, ReadTsKvQuery query) { String strKey = query.getKey(); Integer keyId = getOrSaveKeyId(strKey); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 701c67a648..fb15af723a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -126,6 +126,11 @@ public class BaseTimeseriesService implements TimeseriesService { return timeseriesLatestDao.findAllKeysByEntityIds(tenantId, entityIds); } + @Override + public void cleanup(long systemTtl) { + timeseriesDao.cleanup(systemTtl); + } + @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { validate(entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 240d5a0b88..ce653e2e6e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -288,6 +288,11 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD } } + @Override + public void cleanup(long systemTtl) { + //Cleanup by TTL is native for Cassandra + } + private ListenableFuture> findAllAsyncWithLimit(TenantId tenantId, EntityId entityId, ReadTsKvQuery query) { long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java index 3b3eb4ee0a..e9af5f0b75 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesDao.java @@ -38,4 +38,6 @@ public interface TimeseriesDao { ListenableFuture remove(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); ListenableFuture removePartition(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); + + void cleanup(long systemTtl); }