Adding partition to edge_event table
This commit is contained in:
parent
6c9ad0399d
commit
11ee41bf34
@ -55,16 +55,16 @@ CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_t
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
p RECORD;
|
||||
prt RECORD;
|
||||
partition_end_ts BIGINT;
|
||||
BEGIN
|
||||
FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log
|
||||
WHERE created_time >= start_time_ms AND created_time < end_time_ms
|
||||
LOOP
|
||||
partition_end_ts = p.partition_ts + partition_size_ms;
|
||||
RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
|
||||
partition_end_ts = prt.partition_ts + partition_size_ms;
|
||||
RAISE NOTICE '[audit_log] Partition to create : [%-%]', prt.partition_ts, partition_end_ts;
|
||||
EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' ||
|
||||
'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
|
||||
'FOR VALUES FROM ( %s ) TO ( %s )', prt.partition_ts, prt.partition_ts, partition_end_ts);
|
||||
END LOOP;
|
||||
|
||||
INSERT INTO audit_log
|
||||
|
||||
76
application/src/main/data/upgrade/3.4.2/schema_update.sql
Normal file
76
application/src/main/data/upgrade/3.4.2/schema_update.sql
Normal file
@ -0,0 +1,76 @@
|
||||
--
|
||||
-- Copyright © 2016-2022 The Thingsboard Authors
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
DO
|
||||
$$
|
||||
DECLARE table_partition RECORD;
|
||||
BEGIN
|
||||
-- in case of running the upgrade script a second time:
|
||||
IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_edge_event')) THEN
|
||||
ALTER TABLE edge_event RENAME TO old_edge_event;
|
||||
ALTER INDEX IF EXISTS idx_edge_event_tenant_id_and_created_time RENAME TO idx_old_edge_event_tenant_id_and_created_time;
|
||||
|
||||
FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
|
||||
FROM pg_tables WHERE tablename LIKE 'edge_event_%'
|
||||
LOOP
|
||||
EXECUTE format('ALTER TABLE %s RENAME TO old_edge_event_%s', table_partition.name, table_partition.partition_ts);
|
||||
END LOOP;
|
||||
ELSE
|
||||
RAISE NOTICE 'Table old_edge_event already exists, leaving as is';
|
||||
END IF;
|
||||
END;
|
||||
$$;
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS edge_event (
|
||||
id uuid NOT NULL,
|
||||
created_time bigint NOT NULL,
|
||||
edge_id uuid,
|
||||
edge_event_type varchar(255),
|
||||
edge_event_uid varchar(255),
|
||||
entity_id uuid,
|
||||
edge_event_action varchar(255),
|
||||
body varchar(10000000),
|
||||
tenant_id uuid,
|
||||
ts bigint NOT NULL
|
||||
) PARTITION BY RANGE (created_time);
|
||||
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
|
||||
|
||||
|
||||
CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
DECLARE
|
||||
p RECORD;
|
||||
partition_end_ts BIGINT;
|
||||
BEGIN
|
||||
FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_edge_event
|
||||
WHERE created_time >= start_time_ms AND created_time < end_time_ms
|
||||
LOOP
|
||||
partition_end_ts = p.partition_ts + partition_size_ms;
|
||||
RAISE NOTICE '[edge_event] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
|
||||
EXECUTE format('CREATE TABLE IF NOT EXISTS edge_event_%s PARTITION OF edge_event ' ||
|
||||
'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
|
||||
END LOOP;
|
||||
|
||||
INSERT INTO edge_event
|
||||
SELECT id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts
|
||||
FROM old_edge_event
|
||||
WHERE created_time >= start_time_ms AND created_time < end_time_ms;
|
||||
END;
|
||||
$$;
|
||||
|
||||
|
||||
@ -230,9 +230,14 @@ public class ThingsboardInstallService {
|
||||
databaseEntitiesUpgradeService.upgradeDatabase("3.4.1");
|
||||
dataUpdateService.updateData("3.4.1");
|
||||
log.info("Updating system data...");
|
||||
break;
|
||||
case "3.4.2":
|
||||
log.info("Upgrading ThingsBoard from version 3.4.2 to 3.5.0 ...");
|
||||
databaseEntitiesUpgradeService.upgradeDatabase("3.4.2");
|
||||
dataUpdateService.updateData("3.4.2");
|
||||
log.info("Updating system data...");
|
||||
systemDataLoaderService.updateSystemWidgets();
|
||||
break;
|
||||
|
||||
//TODO update CacheCleanupService on the next version upgrade
|
||||
|
||||
default:
|
||||
|
||||
@ -654,6 +654,18 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
log.error("Failed updating schema!!!", e);
|
||||
}
|
||||
break;
|
||||
case "3.4.2":
|
||||
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
|
||||
log.info("Updating schema ...");
|
||||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.4.2", SCHEMA_UPDATE_SQL);
|
||||
loadSql(schemaUpdateFile, conn);
|
||||
log.info("Updating schema settings...");
|
||||
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3005000;");
|
||||
log.info("Schema updated.");
|
||||
} catch (Exception e) {
|
||||
log.error("Failed updating schema!!!", e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);
|
||||
}
|
||||
|
||||
@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.alarm.AlarmDao;
|
||||
import org.thingsboard.server.dao.audit.AuditLogDao;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.event.EventService;
|
||||
@ -142,6 +143,9 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
@Autowired
|
||||
private AuditLogDao auditLogDao;
|
||||
|
||||
@Autowired
|
||||
private EdgeEventDao edgeEventDao;
|
||||
|
||||
@Override
|
||||
public void updateData(String fromVersion) throws Exception {
|
||||
switch (fromVersion) {
|
||||
@ -189,6 +193,24 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
} else {
|
||||
log.info("Skipping audit logs migration");
|
||||
}
|
||||
boolean skipEdgeEventsMigrationTemp = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
|
||||
if (!skipEdgeEventsMigrationTemp) {
|
||||
log.info("Updating data from version 3.4.1 to 3.4.2 ...");
|
||||
log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
|
||||
edgeEventDao.migrateEdgeEvents();
|
||||
} else {
|
||||
log.info("Skipping edge events migration");
|
||||
}
|
||||
break;
|
||||
case "3.5.0":
|
||||
boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
|
||||
if (!skipEdgeEventsMigration) {
|
||||
log.info("Updating data from version 3.4.2 to 3.5.0 ...");
|
||||
log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
|
||||
edgeEventDao.migrateEdgeEvents();
|
||||
} else {
|
||||
log.info("Skipping edge events migration");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
|
||||
|
||||
@ -17,15 +17,23 @@ package org.thingsboard.server.service.ttl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventService;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_COLUMN_FAMILY_NAME;
|
||||
|
||||
@TbCoreComponent
|
||||
@Slf4j
|
||||
@Service
|
||||
@ConditionalOnExpression("${sql.ttl.edge_events.enabled:true} && ${sql.ttl.edge_events.edge_event_ttl:0} > 0")
|
||||
public class EdgeEventsCleanUpService extends AbstractCleanUpService {
|
||||
|
||||
public static final String RANDOM_DELAY_INTERVAL_MS_EXPRESSION =
|
||||
@ -34,20 +42,29 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
|
||||
@Value("${sql.ttl.edge_events.edge_events_ttl}")
|
||||
private long ttl;
|
||||
|
||||
@Value("${sql.ttl.edge_events.enabled}")
|
||||
@Value("${sql.edge_events.partition_size:168}")
|
||||
private int partitionSizeInHours;
|
||||
|
||||
@Value("${sql.ttl.edge_events.enabled:true}")
|
||||
private boolean ttlTaskExecutionEnabled;
|
||||
|
||||
private final EdgeEventService edgeEventService;
|
||||
|
||||
public EdgeEventsCleanUpService(PartitionService partitionService, EdgeEventService edgeEventService) {
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
|
||||
public EdgeEventsCleanUpService(PartitionService partitionService, EdgeEventService edgeEventService, SqlPartitioningRepository partitioningRepository) {
|
||||
super(partitionService);
|
||||
this.edgeEventService = edgeEventService;
|
||||
this.partitioningRepository = partitioningRepository;
|
||||
}
|
||||
|
||||
@Scheduled(initialDelayString = RANDOM_DELAY_INTERVAL_MS_EXPRESSION, fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
|
||||
public void cleanUp() {
|
||||
if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
|
||||
edgeEventService.cleanupEvents(ttl);
|
||||
long edgeEventsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttl);
|
||||
if(isSystemTenantPartitionMine()) {
|
||||
edgeEventService.cleanupEvents(edgeEventsExpTime);
|
||||
} else {
|
||||
partitioningRepository.cleanupPartitionsCache(EDGE_EVENT_COLUMN_FAMILY_NAME, edgeEventsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -265,6 +265,7 @@ sql:
|
||||
batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
|
||||
batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
|
||||
stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}"
|
||||
partition_size: "${SQL_EDGE_EVENTS_PARTITION_SIZE_HOURS:168}" # Number of hours to partition the events. The current value corresponds to one week.
|
||||
audit_logs:
|
||||
partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
|
||||
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
||||
|
||||
@ -178,7 +178,6 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
|
||||
reset(partitioningRepository);
|
||||
AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId);
|
||||
verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs));
|
||||
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
|
||||
assertThat(partitions).singleElement().satisfies(partitionStartTs -> {
|
||||
assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs));
|
||||
|
||||
@ -22,6 +22,9 @@ import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
@ -29,16 +32,27 @@ import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.security.Authority;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.service.ttl.EdgeEventsCleanUpService;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||
|
||||
@TestPropertySource(properties = {
|
||||
@ -50,6 +64,18 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest
|
||||
private Tenant savedTenant;
|
||||
private User tenantAdmin;
|
||||
|
||||
@Autowired
|
||||
private EdgeEventDao edgeEventDao;
|
||||
@SpyBean
|
||||
private SqlPartitioningRepository partitioningRepository;
|
||||
@Autowired
|
||||
private EdgeEventsCleanUpService edgeEventsCleanUpService;
|
||||
|
||||
@Value("#{${sql.edge_events.partition_size} * 60 * 60 * 1000}")
|
||||
private long partitionDurationInMs;
|
||||
@Value("${sql.ttl.edge_events.edge_event_ttl}")
|
||||
private long edgeEventTtlInSec;
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
loginSysAdmin();
|
||||
@ -114,6 +140,34 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest
|
||||
Assert.assertTrue(edgeEvents.stream().anyMatch(ee -> EdgeEventType.RELATION.equals(ee.getType())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void saveEdgeEvent_thenCreatePartitionIfNotExist() {
|
||||
reset(partitioningRepository);
|
||||
EdgeEvent edgeEvent = createEdgeEvent();
|
||||
verify(partitioningRepository).createPartitionIfNotExists(eq("edge_event"), eq(edgeEvent.getCreatedTime()), eq(partitionDurationInMs));
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("edge_event");
|
||||
assertThat(partitions).singleElement().satisfies(partitionStartTs -> {
|
||||
assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(edgeEvent.getCreatedTime(), partitionDurationInMs));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cleanUpEdgeEventByTtl_dropOldPartitions() {
|
||||
long oldEdgeEventTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
|
||||
long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldEdgeEventTs, partitionDurationInMs);
|
||||
partitioningRepository.createPartitionIfNotExists("edge_event", oldEdgeEventTs, partitionDurationInMs);
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("edge_event");
|
||||
assertThat(partitions).contains(partitionStartTs);
|
||||
|
||||
edgeEventsCleanUpService.cleanUp();
|
||||
partitions = partitioningRepository.fetchPartitions("edge_event");
|
||||
assertThat(partitions).doesNotContain(partitionStartTs);
|
||||
assertThat(partitions).allSatisfy(partitionsStart -> {
|
||||
long partitionEndTs = partitionsStart + partitionDurationInMs;
|
||||
assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(edgeEventTtlInSec));
|
||||
});
|
||||
}
|
||||
|
||||
private List<EdgeEvent> findEdgeEvents(EdgeId edgeId) throws Exception {
|
||||
return doGetTypedWithTimePageLink("/api/edge/" + edgeId.toString() + "/events?",
|
||||
new TypeReference<PageData<EdgeEvent>>() {
|
||||
@ -134,4 +188,19 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest
|
||||
return asset;
|
||||
}
|
||||
|
||||
private EdgeEvent createEdgeEvent() {
|
||||
EdgeEvent edgeEvent = new EdgeEvent();
|
||||
edgeEvent.setCreatedTime(System.currentTimeMillis());
|
||||
edgeEvent.setTenantId(tenantId);
|
||||
edgeEvent.setAction(EdgeEventActionType.ADDED);
|
||||
edgeEvent.setEntityId(tenantAdmin.getUuidId());
|
||||
edgeEvent.setType(EdgeEventType.ALARM);
|
||||
try {
|
||||
edgeEventDao.saveAsync(edgeEvent).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return edgeEvent;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -59,4 +59,7 @@ queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0
|
||||
usage.stats.report.enabled=false
|
||||
|
||||
sql.audit_logs.partition_size=24
|
||||
sql.ttl.audit_logs.ttl=2592000
|
||||
sql.ttl.audit_logs.ttl=2592000
|
||||
|
||||
sql.edge_events.partition_size=168
|
||||
sql.ttl.edge_events.edge_event_ttl=2592000
|
||||
@ -17,7 +17,6 @@ package org.thingsboard.server.dao.edge;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
@ -30,11 +29,14 @@ import org.thingsboard.server.dao.service.DataValidator;
|
||||
@Slf4j
|
||||
public class BaseEdgeEventService implements EdgeEventService {
|
||||
|
||||
@Autowired
|
||||
private EdgeEventDao edgeEventDao;
|
||||
private final EdgeEventDao edgeEventDao;
|
||||
|
||||
@Autowired
|
||||
private DataValidator<EdgeEvent> edgeEventValidator;
|
||||
private final DataValidator<EdgeEvent> edgeEventValidator;
|
||||
|
||||
public BaseEdgeEventService(EdgeEventDao edgeEventDao, DataValidator<EdgeEvent> edgeEventValidator) {
|
||||
this.edgeEventDao = edgeEventDao;
|
||||
this.edgeEventValidator = edgeEventValidator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
|
||||
|
||||
@ -54,4 +54,6 @@ public interface EdgeEventDao extends Dao<EdgeEvent> {
|
||||
*/
|
||||
void cleanupEvents(long ttl);
|
||||
|
||||
void migrateEdgeEvents();
|
||||
|
||||
}
|
||||
|
||||
@ -17,10 +17,11 @@ package org.thingsboard.server.dao.sql.edge;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
@ -31,19 +32,17 @@ import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
|
||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Comparator;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
@ -51,19 +50,25 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntity, EdgeEvent> implements EdgeEventDao {
|
||||
|
||||
private final UUID systemTenantId = NULL_UUID;
|
||||
|
||||
@Autowired
|
||||
ScheduledLogExecutorComponent logExecutor;
|
||||
private final ScheduledLogExecutorComponent logExecutor;
|
||||
|
||||
@Autowired
|
||||
private StatsFactory statsFactory;
|
||||
private final StatsFactory statsFactory;
|
||||
|
||||
private final EdgeEventRepository edgeEventRepository;
|
||||
|
||||
private final EdgeEventInsertRepository edgeEventInsertRepository;
|
||||
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Value("${sql.edge_events.batch_size:1000}")
|
||||
private int batchSize;
|
||||
@ -74,14 +79,15 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
|
||||
@Value("${sql.edge_events.stats_print_interval_ms:10000}")
|
||||
private long statsPrintIntervalMs;
|
||||
|
||||
@Value("${sql.edge_events.partitions_size:168}")
|
||||
private int partitionSizeInHours;
|
||||
|
||||
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
|
||||
private long edge_events_ttl;
|
||||
|
||||
private static final String TABLE_NAME = ModelConstants.EDGE_EVENT_COLUMN_FAMILY_NAME;
|
||||
private TbSqlBlockingQueueWrapper<EdgeEventEntity> queue;
|
||||
|
||||
@Autowired
|
||||
private EdgeEventRepository edgeEventRepository;
|
||||
|
||||
@Autowired
|
||||
private EdgeEventInsertRepository edgeEventInsertRepository;
|
||||
|
||||
@Override
|
||||
protected Class<EdgeEventEntity> getEntityClass() {
|
||||
return EdgeEventEntity.class;
|
||||
@ -140,6 +146,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
|
||||
if (StringUtils.isEmpty(edgeEvent.getUid())) {
|
||||
edgeEvent.setUid(edgeEvent.getId().toString());
|
||||
}
|
||||
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, edgeEvent.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
return save(new EdgeEventEntity(edgeEvent));
|
||||
}
|
||||
|
||||
@ -189,20 +196,38 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
|
||||
|
||||
@Override
|
||||
public void cleanupEvents(long ttl) {
|
||||
log.info("Going to cleanup old edge events using ttl: {}s", ttl);
|
||||
try (Connection connection = dataSource.getConnection();
|
||||
PreparedStatement stmt = connection.prepareStatement("call cleanup_edge_events_by_ttl(?,?)")) {
|
||||
stmt.setLong(1, ttl);
|
||||
stmt.setLong(2, 0);
|
||||
stmt.setQueryTimeout((int) TimeUnit.HOURS.toSeconds(1));
|
||||
stmt.execute();
|
||||
printWarnings(stmt);
|
||||
try (ResultSet resultSet = stmt.getResultSet()) {
|
||||
resultSet.next();
|
||||
log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
log.error("SQLException occurred during edge events TTL task execution ", e);
|
||||
}
|
||||
partitioningRepository.dropPartitionsBefore(TABLE_NAME, ttl, TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrateEdgeEvents() {
|
||||
long startTime = edge_events_ttl > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(edge_events_ttl) : 1480982400000L;
|
||||
|
||||
long currentTime = System.currentTimeMillis();
|
||||
var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
|
||||
long numberOfPartitions = (currentTime - startTime) / partitionStepInMs;
|
||||
|
||||
if (numberOfPartitions > 1000) {
|
||||
String error = "Please adjust your edge event partitioning configuration. Configuration with partition size " +
|
||||
"of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " +
|
||||
"(> 1000) partitions which is not recommended!";
|
||||
log.error(error);
|
||||
throw new RuntimeException(error);
|
||||
}
|
||||
|
||||
while (startTime < currentTime) {
|
||||
var endTime = startTime + partitionStepInMs;
|
||||
log.info("Migrating edge event for time period: {} - {}", startTime, endTime);
|
||||
callMigrationFunction(startTime, endTime, partitionStepInMs);
|
||||
startTime = endTime;
|
||||
}
|
||||
log.info("Event edge migration finished");
|
||||
|
||||
jdbcTemplate.execute("DROP TABLE IF EXISTS old_edge_event");
|
||||
}
|
||||
|
||||
private void callMigrationFunction(long startTime, long endTime, long partitionSIzeInMs) {
|
||||
jdbcTemplate.update("CALL migrate_edge_event(?, ?, ?)", startTime, endTime, partitionSIzeInMs);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -50,6 +50,8 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_device_external_id ON device(tenant_id, external_id);
|
||||
|
||||
@ -719,7 +719,7 @@ CREATE TABLE IF NOT EXISTS edge (
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS edge_event (
|
||||
id uuid NOT NULL CONSTRAINT edge_event_pkey PRIMARY KEY,
|
||||
id uuid NOT NULL,
|
||||
created_time bigint NOT NULL,
|
||||
edge_id uuid,
|
||||
edge_event_type varchar(255),
|
||||
@ -729,7 +729,7 @@ CREATE TABLE IF NOT EXISTS edge_event (
|
||||
body varchar(10000000),
|
||||
tenant_id uuid,
|
||||
ts bigint NOT NULL
|
||||
);
|
||||
) PARTITION BY RANGE(created_time);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rpc (
|
||||
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user