From 11ee41bf341dd5500b2b91bd2928ce34cd44893f Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 7 Nov 2022 14:42:49 +0200 Subject: [PATCH] Adding partition to edge_event table --- .../main/data/upgrade/3.4.1/schema_update.sql | 8 +- .../main/data/upgrade/3.4.2/schema_update.sql | 76 ++++++++++++++++ .../install/ThingsboardInstallService.java | 7 +- .../install/SqlDatabaseUpgradeService.java | 12 +++ .../update/DefaultDataUpdateService.java | 22 +++++ .../service/ttl/EdgeEventsCleanUpService.java | 25 +++++- .../src/main/resources/thingsboard.yml | 1 + .../BaseAuditLogControllerTest.java | 1 - .../BaseEdgeEventControllerTest.java | 69 ++++++++++++++ .../resources/application-test.properties | 5 +- .../server/dao/edge/BaseEdgeEventService.java | 12 +-- .../server/dao/edge/EdgeEventDao.java | 2 + .../dao/sql/edge/JpaBaseEdgeEventDao.java | 89 ++++++++++++------- .../resources/sql/schema-entities-idx.sql | 2 + .../main/resources/sql/schema-entities.sql | 4 +- 15 files changed, 285 insertions(+), 50 deletions(-) create mode 100644 application/src/main/data/upgrade/3.4.2/schema_update.sql diff --git a/application/src/main/data/upgrade/3.4.1/schema_update.sql b/application/src/main/data/upgrade/3.4.1/schema_update.sql index 5891246b44..66169f0120 100644 --- a/application/src/main/data/upgrade/3.4.1/schema_update.sql +++ b/application/src/main/data/upgrade/3.4.1/schema_update.sql @@ -55,16 +55,16 @@ CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_t LANGUAGE plpgsql AS $$ DECLARE - p RECORD; + prt RECORD; partition_end_ts BIGINT; BEGIN FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log WHERE created_time >= start_time_ms AND created_time < end_time_ms LOOP - partition_end_ts = p.partition_ts + partition_size_ms; - RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; + partition_end_ts = prt.partition_ts + partition_size_ms; + RAISE NOTICE '[audit_log] Partition to create : [%-%]', prt.partition_ts, partition_end_ts; EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' || - 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); + 'FOR VALUES FROM ( %s ) TO ( %s )', prt.partition_ts, prt.partition_ts, partition_end_ts); END LOOP; INSERT INTO audit_log diff --git a/application/src/main/data/upgrade/3.4.2/schema_update.sql b/application/src/main/data/upgrade/3.4.2/schema_update.sql new file mode 100644 index 0000000000..10c009b740 --- /dev/null +++ b/application/src/main/data/upgrade/3.4.2/schema_update.sql @@ -0,0 +1,76 @@ +-- +-- Copyright © 2016-2022 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +DO +$$ + DECLARE table_partition RECORD; + BEGIN + -- in case of running the upgrade script a second time: + IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_edge_event')) THEN + ALTER TABLE edge_event RENAME TO old_edge_event; + ALTER INDEX IF EXISTS idx_edge_event_tenant_id_and_created_time RENAME TO idx_old_edge_event_tenant_id_and_created_time; + + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'edge_event_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_edge_event_%s', table_partition.name, table_partition.partition_ts); + END LOOP; + ELSE + RAISE NOTICE 'Table old_edge_event already exists, leaving as is'; + END IF; + END; +$$; + + +CREATE TABLE IF NOT EXISTS edge_event ( + id uuid NOT NULL, + created_time bigint NOT NULL, + edge_id uuid, + edge_event_type varchar(255), + edge_event_uid varchar(255), + entity_id uuid, + edge_event_action varchar(255), + body varchar(10000000), + tenant_id uuid, + ts bigint NOT NULL + ) PARTITION BY RANGE (created_time); +CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC); + + +CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) + LANGUAGE plpgsql AS +$$ +DECLARE + p RECORD; + partition_end_ts BIGINT; +BEGIN + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_edge_event + WHERE created_time >= start_time_ms AND created_time < end_time_ms + LOOP + partition_end_ts = p.partition_ts + partition_size_ms; + RAISE NOTICE '[edge_event] Partition to create : [%-%]', p.partition_ts, partition_end_ts; + EXECUTE format('CREATE TABLE IF NOT EXISTS edge_event_%s PARTITION OF edge_event ' || + 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); + END LOOP; + + INSERT INTO edge_event + SELECT id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts + FROM old_edge_event + WHERE created_time >= start_time_ms AND created_time < end_time_ms; +END; +$$; + + diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 33a6d4bdd1..5fd8c47997 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -230,9 +230,14 @@ public class ThingsboardInstallService { databaseEntitiesUpgradeService.upgradeDatabase("3.4.1"); dataUpdateService.updateData("3.4.1"); log.info("Updating system data..."); + break; + case "3.4.2": + log.info("Upgrading ThingsBoard from version 3.4.2 to 3.5.0 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("3.4.2"); + dataUpdateService.updateData("3.4.2"); + log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; - //TODO update CacheCleanupService on the next version upgrade default: diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index 092726d0c8..c7ccf4cd18 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -654,6 +654,18 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService log.error("Failed updating schema!!!", e); } break; + case "3.4.2": + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + log.info("Updating schema ..."); + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.4.2", SCHEMA_UPDATE_SQL); + loadSql(schemaUpdateFile, conn); + log.info("Updating schema settings..."); + conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3005000;"); + log.info("Schema updated."); + } catch (Exception e) { + log.error("Failed updating schema!!!", e); + } + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index a8cf5374a2..68c1c50140 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.edge.EdgeEventDao; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; @@ -142,6 +143,9 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private AuditLogDao auditLogDao; + @Autowired + private EdgeEventDao edgeEventDao; + @Override public void updateData(String fromVersion) throws Exception { switch (fromVersion) { @@ -189,6 +193,24 @@ public class DefaultDataUpdateService implements DataUpdateService { } else { log.info("Skipping audit logs migration"); } + boolean skipEdgeEventsMigrationTemp = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); + if (!skipEdgeEventsMigrationTemp) { + log.info("Updating data from version 3.4.1 to 3.4.2 ..."); + log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true"); + edgeEventDao.migrateEdgeEvents(); + } else { + log.info("Skipping edge events migration"); + } + break; + case "3.5.0": + boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); + if (!skipEdgeEventsMigration) { + log.info("Updating data from version 3.4.2 to 3.5.0 ..."); + log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true"); + edgeEventDao.migrateEdgeEvents(); + } else { + log.info("Skipping edge events migration"); + } break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java index c4ce4e49b2..880d3eb125 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java @@ -17,15 +17,23 @@ package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.edge.EdgeEventDao; import org.thingsboard.server.dao.edge.EdgeEventService; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_COLUMN_FAMILY_NAME; + @TbCoreComponent @Slf4j @Service +@ConditionalOnExpression("${sql.ttl.edge_events.enabled:true} && ${sql.ttl.edge_events.edge_event_ttl:0} > 0") public class EdgeEventsCleanUpService extends AbstractCleanUpService { public static final String RANDOM_DELAY_INTERVAL_MS_EXPRESSION = @@ -34,20 +42,29 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService { @Value("${sql.ttl.edge_events.edge_events_ttl}") private long ttl; - @Value("${sql.ttl.edge_events.enabled}") + @Value("${sql.edge_events.partition_size:168}") + private int partitionSizeInHours; + + @Value("${sql.ttl.edge_events.enabled:true}") private boolean ttlTaskExecutionEnabled; private final EdgeEventService edgeEventService; - public EdgeEventsCleanUpService(PartitionService partitionService, EdgeEventService edgeEventService) { + private final SqlPartitioningRepository partitioningRepository; + + public EdgeEventsCleanUpService(PartitionService partitionService, EdgeEventService edgeEventService, SqlPartitioningRepository partitioningRepository) { super(partitionService); this.edgeEventService = edgeEventService; + this.partitioningRepository = partitioningRepository; } @Scheduled(initialDelayString = RANDOM_DELAY_INTERVAL_MS_EXPRESSION, fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") public void cleanUp() { - if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) { - edgeEventService.cleanupEvents(ttl); + long edgeEventsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttl); + if(isSystemTenantPartitionMine()) { + edgeEventService.cleanupEvents(edgeEventsExpTime); + } else { + partitioningRepository.cleanupPartitionsCache(EDGE_EVENT_COLUMN_FAMILY_NAME, edgeEventsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 66a603a1a1..2a8ea20611 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -265,6 +265,7 @@ sql: batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}" batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}" stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}" + partition_size: "${SQL_EDGE_EVENTS_PARTITION_SIZE_HOURS:168}" # Number of hours to partition the events. The current value corresponds to one week. audit_logs: partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java index c46f7d9c46..4f8d07864b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java @@ -178,7 +178,6 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest reset(partitioningRepository); AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId); verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs)); - List partitions = partitioningRepository.fetchPartitions("audit_log"); assertThat(partitions).singleElement().satisfies(partitionStartTs -> { assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs)); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseEdgeEventControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseEdgeEventControllerTest.java index ef66270130..ccf218c132 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseEdgeEventControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseEdgeEventControllerTest.java @@ -22,6 +22,9 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; @@ -29,16 +32,27 @@ import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.dao.edge.EdgeEventDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.service.ttl.EdgeEventsCleanUpService; +import java.time.LocalDate; +import java.time.ZoneOffset; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.assertj.core.api.Assertions.assertThat; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @TestPropertySource(properties = { @@ -50,6 +64,18 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest private Tenant savedTenant; private User tenantAdmin; + @Autowired + private EdgeEventDao edgeEventDao; + @SpyBean + private SqlPartitioningRepository partitioningRepository; + @Autowired + private EdgeEventsCleanUpService edgeEventsCleanUpService; + + @Value("#{${sql.edge_events.partition_size} * 60 * 60 * 1000}") + private long partitionDurationInMs; + @Value("${sql.ttl.edge_events.edge_event_ttl}") + private long edgeEventTtlInSec; + @Before public void beforeTest() throws Exception { loginSysAdmin(); @@ -114,6 +140,34 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest Assert.assertTrue(edgeEvents.stream().anyMatch(ee -> EdgeEventType.RELATION.equals(ee.getType()))); } + @Test + public void saveEdgeEvent_thenCreatePartitionIfNotExist() { + reset(partitioningRepository); + EdgeEvent edgeEvent = createEdgeEvent(); + verify(partitioningRepository).createPartitionIfNotExists(eq("edge_event"), eq(edgeEvent.getCreatedTime()), eq(partitionDurationInMs)); + List partitions = partitioningRepository.fetchPartitions("edge_event"); + assertThat(partitions).singleElement().satisfies(partitionStartTs -> { + assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(edgeEvent.getCreatedTime(), partitionDurationInMs)); + }); + } + + @Test + public void cleanUpEdgeEventByTtl_dropOldPartitions() { + long oldEdgeEventTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(); + long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldEdgeEventTs, partitionDurationInMs); + partitioningRepository.createPartitionIfNotExists("edge_event", oldEdgeEventTs, partitionDurationInMs); + List partitions = partitioningRepository.fetchPartitions("edge_event"); + assertThat(partitions).contains(partitionStartTs); + + edgeEventsCleanUpService.cleanUp(); + partitions = partitioningRepository.fetchPartitions("edge_event"); + assertThat(partitions).doesNotContain(partitionStartTs); + assertThat(partitions).allSatisfy(partitionsStart -> { + long partitionEndTs = partitionsStart + partitionDurationInMs; + assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(edgeEventTtlInSec)); + }); + } + private List findEdgeEvents(EdgeId edgeId) throws Exception { return doGetTypedWithTimePageLink("/api/edge/" + edgeId.toString() + "/events?", new TypeReference>() { @@ -134,4 +188,19 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest return asset; } + private EdgeEvent createEdgeEvent() { + EdgeEvent edgeEvent = new EdgeEvent(); + edgeEvent.setCreatedTime(System.currentTimeMillis()); + edgeEvent.setTenantId(tenantId); + edgeEvent.setAction(EdgeEventActionType.ADDED); + edgeEvent.setEntityId(tenantAdmin.getUuidId()); + edgeEvent.setType(EdgeEventType.ALARM); + try { + edgeEventDao.saveAsync(edgeEvent).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return edgeEvent; + } + } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index eacd1733d3..fb2fdfe0a4 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -59,4 +59,7 @@ queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0 usage.stats.report.enabled=false sql.audit_logs.partition_size=24 -sql.ttl.audit_logs.ttl=2592000 \ No newline at end of file +sql.ttl.audit_logs.ttl=2592000 + +sql.edge_events.partition_size=168 +sql.ttl.edge_events.edge_event_ttl=2592000 \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java index f9e94af613..09322aa555 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.edge; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; @@ -30,11 +29,14 @@ import org.thingsboard.server.dao.service.DataValidator; @Slf4j public class BaseEdgeEventService implements EdgeEventService { - @Autowired - private EdgeEventDao edgeEventDao; + private final EdgeEventDao edgeEventDao; - @Autowired - private DataValidator edgeEventValidator; + private final DataValidator edgeEventValidator; + + public BaseEdgeEventService(EdgeEventDao edgeEventDao, DataValidator edgeEventValidator) { + this.edgeEventDao = edgeEventDao; + this.edgeEventValidator = edgeEventValidator; + } @Override public ListenableFuture saveAsync(EdgeEvent edgeEvent) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java index 7a43d6c065..cb31869213 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java @@ -54,4 +54,6 @@ public interface EdgeEventDao extends Dao { */ void cleanupEvents(long ttl); + void migrateEdgeEvents(); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index df182659bf..334d5a5671 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -17,10 +17,11 @@ package org.thingsboard.server.dao.sql.edge; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -31,19 +32,17 @@ import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.edge.EdgeEventDao; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.EdgeEventEntity; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; import org.thingsboard.server.dao.util.SqlDao; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Comparator; import java.util.Objects; import java.util.UUID; @@ -51,19 +50,25 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; - -@Slf4j @Component @SqlDao +@RequiredArgsConstructor +@Slf4j public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao implements EdgeEventDao { private final UUID systemTenantId = NULL_UUID; - @Autowired - ScheduledLogExecutorComponent logExecutor; + private final ScheduledLogExecutorComponent logExecutor; - @Autowired - private StatsFactory statsFactory; + private final StatsFactory statsFactory; + + private final EdgeEventRepository edgeEventRepository; + + private final EdgeEventInsertRepository edgeEventInsertRepository; + + private final SqlPartitioningRepository partitioningRepository; + + private final JdbcTemplate jdbcTemplate; @Value("${sql.edge_events.batch_size:1000}") private int batchSize; @@ -74,14 +79,15 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao queue; - @Autowired - private EdgeEventRepository edgeEventRepository; - - @Autowired - private EdgeEventInsertRepository edgeEventInsertRepository; - @Override protected Class getEntityClass() { return EdgeEventEntity.class; @@ -140,6 +146,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(edge_events_ttl) : 1480982400000L; + + long currentTime = System.currentTimeMillis(); + var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours); + long numberOfPartitions = (currentTime - startTime) / partitionStepInMs; + + if (numberOfPartitions > 1000) { + String error = "Please adjust your edge event partitioning configuration. Configuration with partition size " + + "of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " + + "(> 1000) partitions which is not recommended!"; + log.error(error); + throw new RuntimeException(error); + } + + while (startTime < currentTime) { + var endTime = startTime + partitionStepInMs; + log.info("Migrating edge event for time period: {} - {}", startTime, endTime); + callMigrationFunction(startTime, endTime, partitionStepInMs); + startTime = endTime; + } + log.info("Event edge migration finished"); + + jdbcTemplate.execute("DROP TABLE IF EXISTS old_edge_event"); + } + + private void callMigrationFunction(long startTime, long endTime, long partitionSIzeInMs) { + jdbcTemplate.update("CALL migrate_edge_event(?, ?, ?)", startTime, endTime, partitionSIzeInMs); + } + } diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 34862e5af3..e7586b17a8 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -50,6 +50,8 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); +CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC); + CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); CREATE INDEX IF NOT EXISTS idx_device_external_id ON device(tenant_id, external_id); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 51df863ae5..73039274a6 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -719,7 +719,7 @@ CREATE TABLE IF NOT EXISTS edge ( ); CREATE TABLE IF NOT EXISTS edge_event ( - id uuid NOT NULL CONSTRAINT edge_event_pkey PRIMARY KEY, + id uuid NOT NULL, created_time bigint NOT NULL, edge_id uuid, edge_event_type varchar(255), @@ -729,7 +729,7 @@ CREATE TABLE IF NOT EXISTS edge_event ( body varchar(10000000), tenant_id uuid, ts bigint NOT NULL -); +) PARTITION BY RANGE(created_time); CREATE TABLE IF NOT EXISTS rpc ( id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,