Merge pull request #7564 from AndriiLandiak/edge-event-partition
[3.4.2] Table partitioning for edge event.
This commit is contained in:
		
						commit
						0d626c5c88
					
				@ -14,6 +14,7 @@
 | 
			
		||||
-- limitations under the License.
 | 
			
		||||
--
 | 
			
		||||
 | 
			
		||||
-- AUDIT LOGS MIGRATION START
 | 
			
		||||
DO
 | 
			
		||||
$$
 | 
			
		||||
    DECLARE table_partition RECORD;
 | 
			
		||||
@ -73,3 +74,64 @@ BEGIN
 | 
			
		||||
    WHERE created_time >= start_time_ms AND created_time < end_time_ms;
 | 
			
		||||
END;
 | 
			
		||||
$$;
 | 
			
		||||
-- AUDIT LOGS MIGRATION END
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- EDGE EVENTS MIGRATION START
 | 
			
		||||
DO
 | 
			
		||||
$$
 | 
			
		||||
    DECLARE table_partition RECORD;
 | 
			
		||||
    BEGIN
 | 
			
		||||
    -- in case of running the upgrade script a second time:
 | 
			
		||||
        IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_edge_event')) THEN
 | 
			
		||||
            ALTER TABLE edge_event RENAME TO old_edge_event;
 | 
			
		||||
            ALTER INDEX IF EXISTS idx_edge_event_tenant_id_and_created_time RENAME TO idx_old_edge_event_tenant_id_and_created_time;
 | 
			
		||||
 | 
			
		||||
            FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
 | 
			
		||||
            FROM pg_tables WHERE tablename LIKE 'edge_event_%'
 | 
			
		||||
            LOOP
 | 
			
		||||
                EXECUTE format('ALTER TABLE %s RENAME TO old_edge_event_%s', table_partition.name, table_partition.partition_ts);
 | 
			
		||||
            END LOOP;
 | 
			
		||||
        ELSE
 | 
			
		||||
            RAISE NOTICE 'Table old_edge_event already exists, leaving as is';
 | 
			
		||||
        END IF;
 | 
			
		||||
END;
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
CREATE TABLE IF NOT EXISTS edge_event (
 | 
			
		||||
    id uuid NOT NULL,
 | 
			
		||||
    created_time bigint NOT NULL,
 | 
			
		||||
    edge_id uuid,
 | 
			
		||||
    edge_event_type varchar(255),
 | 
			
		||||
    edge_event_uid varchar(255),
 | 
			
		||||
    entity_id uuid,
 | 
			
		||||
    edge_event_action varchar(255),
 | 
			
		||||
    body varchar(10000000),
 | 
			
		||||
    tenant_id uuid,
 | 
			
		||||
    ts bigint NOT NULL
 | 
			
		||||
    ) PARTITION BY RANGE (created_time);
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
 | 
			
		||||
 | 
			
		||||
CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
 | 
			
		||||
    LANGUAGE plpgsql AS
 | 
			
		||||
$$
 | 
			
		||||
DECLARE
 | 
			
		||||
    p RECORD;
 | 
			
		||||
    partition_end_ts BIGINT;
 | 
			
		||||
BEGIN
 | 
			
		||||
    FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_edge_event
 | 
			
		||||
    WHERE created_time >= start_time_ms AND created_time < end_time_ms
 | 
			
		||||
    LOOP
 | 
			
		||||
        partition_end_ts = p.partition_ts + partition_size_ms;
 | 
			
		||||
        RAISE NOTICE '[edge_event] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
 | 
			
		||||
        EXECUTE format('CREATE TABLE IF NOT EXISTS edge_event_%s PARTITION OF edge_event ' ||
 | 
			
		||||
               'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
 | 
			
		||||
    END LOOP;
 | 
			
		||||
 | 
			
		||||
    INSERT INTO edge_event
 | 
			
		||||
    SELECT id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts
 | 
			
		||||
    FROM old_edge_event
 | 
			
		||||
    WHERE created_time >= start_time_ms AND created_time < end_time_ms;
 | 
			
		||||
END;
 | 
			
		||||
$$;
 | 
			
		||||
-- EDGE EVENTS MIGRATION END
 | 
			
		||||
 | 
			
		||||
@ -64,6 +64,7 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi
 | 
			
		||||
import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmDao;
 | 
			
		||||
import org.thingsboard.server.dao.audit.AuditLogDao;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.entityview.EntityViewService;
 | 
			
		||||
import org.thingsboard.server.dao.event.EventService;
 | 
			
		||||
@ -142,6 +143,9 @@ public class DefaultDataUpdateService implements DataUpdateService {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private AuditLogDao auditLogDao;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdgeEventDao edgeEventDao;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void updateData(String fromVersion) throws Exception {
 | 
			
		||||
        switch (fromVersion) {
 | 
			
		||||
@ -181,14 +185,21 @@ public class DefaultDataUpdateService implements DataUpdateService {
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
            case "3.4.1":
 | 
			
		||||
                log.info("Updating data from version 3.4.1 to 3.4.2 ...");
 | 
			
		||||
                boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false);
 | 
			
		||||
                if (!skipAuditLogsMigration) {
 | 
			
		||||
                    log.info("Updating data from version 3.4.1 to 3.4.2 ...");
 | 
			
		||||
                    log.info("Starting audit logs migration. Can be skipped with TB_SKIP_AUDIT_LOGS_MIGRATION env variable set to true");
 | 
			
		||||
                    auditLogDao.migrateAuditLogs();
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.info("Skipping audit logs migration");
 | 
			
		||||
                }
 | 
			
		||||
                boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
 | 
			
		||||
                if (!skipEdgeEventsMigration) {
 | 
			
		||||
                    log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
 | 
			
		||||
                    edgeEventDao.migrateEdgeEvents();
 | 
			
		||||
                } else {
 | 
			
		||||
                    log.info("Skipping edge events migration");
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
            default:
 | 
			
		||||
                throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
 | 
			
		||||
 | 
			
		||||
@ -17,15 +17,22 @@ package org.thingsboard.server.service.ttl;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeEventService;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_COLUMN_FAMILY_NAME;
 | 
			
		||||
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Service
 | 
			
		||||
@ConditionalOnExpression("${sql.ttl.edge_events.enabled:true} && ${sql.ttl.edge_events.edge_event_ttl:0} > 0")
 | 
			
		||||
public class EdgeEventsCleanUpService extends AbstractCleanUpService {
 | 
			
		||||
 | 
			
		||||
    public static final String RANDOM_DELAY_INTERVAL_MS_EXPRESSION =
 | 
			
		||||
@ -34,20 +41,29 @@ public class EdgeEventsCleanUpService extends AbstractCleanUpService {
 | 
			
		||||
    @Value("${sql.ttl.edge_events.edge_events_ttl}")
 | 
			
		||||
    private long ttl;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ttl.edge_events.enabled}")
 | 
			
		||||
    @Value("${sql.edge_events.partition_size:168}")
 | 
			
		||||
    private int partitionSizeInHours;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ttl.edge_events.enabled:true}")
 | 
			
		||||
    private boolean ttlTaskExecutionEnabled;
 | 
			
		||||
 | 
			
		||||
    private final EdgeEventService edgeEventService;
 | 
			
		||||
 | 
			
		||||
    public EdgeEventsCleanUpService(PartitionService partitionService, EdgeEventService edgeEventService) {
 | 
			
		||||
    private final SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
 | 
			
		||||
    public EdgeEventsCleanUpService(PartitionService partitionService, EdgeEventService edgeEventService, SqlPartitioningRepository partitioningRepository) {
 | 
			
		||||
        super(partitionService);
 | 
			
		||||
        this.edgeEventService = edgeEventService;
 | 
			
		||||
        this.partitioningRepository = partitioningRepository;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(initialDelayString = RANDOM_DELAY_INTERVAL_MS_EXPRESSION, fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
 | 
			
		||||
    public void cleanUp() {
 | 
			
		||||
        long edgeEventsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttl);
 | 
			
		||||
        if (ttlTaskExecutionEnabled && isSystemTenantPartitionMine()) {
 | 
			
		||||
            edgeEventService.cleanupEvents(ttl);
 | 
			
		||||
            edgeEventService.cleanupEvents(edgeEventsExpTime);
 | 
			
		||||
        } else {
 | 
			
		||||
            partitioningRepository.cleanupPartitionsCache(EDGE_EVENT_COLUMN_FAMILY_NAME, edgeEventsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -273,6 +273,7 @@ sql:
 | 
			
		||||
    batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
 | 
			
		||||
    batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
 | 
			
		||||
    stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}"
 | 
			
		||||
    partition_size: "${SQL_EDGE_EVENTS_PARTITION_SIZE_HOURS:168}" # Number of hours to partition the events. The current value corresponds to one week.
 | 
			
		||||
  audit_logs:
 | 
			
		||||
    partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
 | 
			
		||||
  # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,9 @@ import org.junit.After;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.test.mock.mockito.SpyBean;
 | 
			
		||||
import org.springframework.test.context.TestPropertySource;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.Tenant;
 | 
			
		||||
@ -29,16 +32,27 @@ import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.asset.Asset;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.Edge;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.security.Authority;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.service.ttl.EdgeEventsCleanUpService;
 | 
			
		||||
 | 
			
		||||
import java.time.LocalDate;
 | 
			
		||||
import java.time.ZoneOffset;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import static org.mockito.ArgumentMatchers.eq;
 | 
			
		||||
import static org.mockito.Mockito.reset;
 | 
			
		||||
import static org.mockito.Mockito.verify;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 | 
			
		||||
 | 
			
		||||
@TestPropertySource(properties = {
 | 
			
		||||
@ -50,6 +64,18 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest
 | 
			
		||||
    private Tenant savedTenant;
 | 
			
		||||
    private User tenantAdmin;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdgeEventDao edgeEventDao;
 | 
			
		||||
    @SpyBean
 | 
			
		||||
    private SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdgeEventsCleanUpService edgeEventsCleanUpService;
 | 
			
		||||
 | 
			
		||||
    @Value("#{${sql.edge_events.partition_size} * 60 * 60 * 1000}")
 | 
			
		||||
    private long partitionDurationInMs;
 | 
			
		||||
    @Value("${sql.ttl.edge_events.edge_event_ttl}")
 | 
			
		||||
    private long edgeEventTtlInSec;
 | 
			
		||||
 | 
			
		||||
    @Before
 | 
			
		||||
    public void beforeTest() throws Exception {
 | 
			
		||||
        loginSysAdmin();
 | 
			
		||||
@ -114,6 +140,34 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest
 | 
			
		||||
        Assert.assertTrue(edgeEvents.stream().anyMatch(ee -> EdgeEventType.RELATION.equals(ee.getType())));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void saveEdgeEvent_thenCreatePartitionIfNotExist() {
 | 
			
		||||
        reset(partitioningRepository);
 | 
			
		||||
        EdgeEvent edgeEvent = createEdgeEvent();
 | 
			
		||||
        verify(partitioningRepository).createPartitionIfNotExists(eq("edge_event"), eq(edgeEvent.getCreatedTime()), eq(partitionDurationInMs));
 | 
			
		||||
        List<Long> partitions = partitioningRepository.fetchPartitions("edge_event");
 | 
			
		||||
        assertThat(partitions).singleElement().satisfies(partitionStartTs -> {
 | 
			
		||||
            assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(edgeEvent.getCreatedTime(), partitionDurationInMs));
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void cleanUpEdgeEventByTtl_dropOldPartitions() {
 | 
			
		||||
        long oldEdgeEventTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli();
 | 
			
		||||
        long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldEdgeEventTs, partitionDurationInMs);
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists("edge_event", oldEdgeEventTs, partitionDurationInMs);
 | 
			
		||||
        List<Long> partitions = partitioningRepository.fetchPartitions("edge_event");
 | 
			
		||||
        assertThat(partitions).contains(partitionStartTs);
 | 
			
		||||
 | 
			
		||||
        edgeEventsCleanUpService.cleanUp();
 | 
			
		||||
        partitions = partitioningRepository.fetchPartitions("edge_event");
 | 
			
		||||
        assertThat(partitions).doesNotContain(partitionStartTs);
 | 
			
		||||
        assertThat(partitions).allSatisfy(partitionsStart -> {
 | 
			
		||||
            long partitionEndTs = partitionsStart + partitionDurationInMs;
 | 
			
		||||
            assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(edgeEventTtlInSec));
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<EdgeEvent> findEdgeEvents(EdgeId edgeId) throws Exception {
 | 
			
		||||
        return doGetTypedWithTimePageLink("/api/edge/" + edgeId.toString() + "/events?",
 | 
			
		||||
                new TypeReference<PageData<EdgeEvent>>() {
 | 
			
		||||
@ -134,4 +188,19 @@ public abstract class BaseEdgeEventControllerTest extends AbstractControllerTest
 | 
			
		||||
        return asset;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEvent createEdgeEvent() {
 | 
			
		||||
        EdgeEvent edgeEvent = new EdgeEvent();
 | 
			
		||||
        edgeEvent.setCreatedTime(System.currentTimeMillis());
 | 
			
		||||
        edgeEvent.setTenantId(tenantId);
 | 
			
		||||
        edgeEvent.setAction(EdgeEventActionType.ADDED);
 | 
			
		||||
        edgeEvent.setEntityId(tenantAdmin.getUuidId());
 | 
			
		||||
        edgeEvent.setType(EdgeEventType.ALARM);
 | 
			
		||||
        try {
 | 
			
		||||
            edgeEventDao.saveAsync(edgeEvent).get();
 | 
			
		||||
        } catch (InterruptedException | ExecutionException e) {
 | 
			
		||||
            throw new RuntimeException(e);
 | 
			
		||||
        }
 | 
			
		||||
        return edgeEvent;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -60,3 +60,6 @@ usage.stats.report.enabled=false
 | 
			
		||||
 | 
			
		||||
sql.audit_logs.partition_size=24
 | 
			
		||||
sql.ttl.audit_logs.ttl=2592000
 | 
			
		||||
 | 
			
		||||
sql.edge_events.partition_size=168
 | 
			
		||||
sql.ttl.edge_events.edge_event_ttl=2592000
 | 
			
		||||
 | 
			
		||||
@ -16,8 +16,8 @@
 | 
			
		||||
package org.thingsboard.server.dao.edge;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
@ -28,13 +28,12 @@ import org.thingsboard.server.dao.service.DataValidator;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@AllArgsConstructor
 | 
			
		||||
public class BaseEdgeEventService implements EdgeEventService {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdgeEventDao edgeEventDao;
 | 
			
		||||
    private final EdgeEventDao edgeEventDao;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private DataValidator<EdgeEvent> edgeEventValidator;
 | 
			
		||||
    private final DataValidator<EdgeEvent> edgeEventValidator;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
 | 
			
		||||
 | 
			
		||||
@ -54,4 +54,6 @@ public interface EdgeEventDao extends Dao<EdgeEvent> {
 | 
			
		||||
     */
 | 
			
		||||
    void cleanupEvents(long ttl);
 | 
			
		||||
 | 
			
		||||
    void migrateEdgeEvents();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -17,10 +17,11 @@ package org.thingsboard.server.dao.sql.edge;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.jdbc.core.JdbcTemplate;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
@ -31,19 +32,17 @@ import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
import org.thingsboard.server.common.stats.StatsFactory;
 | 
			
		||||
import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
 | 
			
		||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
 | 
			
		||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.sql.Connection;
 | 
			
		||||
import java.sql.PreparedStatement;
 | 
			
		||||
import java.sql.ResultSet;
 | 
			
		||||
import java.sql.SQLException;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -52,18 +51,25 @@ import java.util.function.Function;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Component
 | 
			
		||||
@SqlDao
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntity, EdgeEvent> implements EdgeEventDao {
 | 
			
		||||
 | 
			
		||||
    private final UUID systemTenantId = NULL_UUID;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    ScheduledLogExecutorComponent logExecutor;
 | 
			
		||||
    private final ScheduledLogExecutorComponent logExecutor;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private StatsFactory statsFactory;
 | 
			
		||||
    private final StatsFactory statsFactory;
 | 
			
		||||
 | 
			
		||||
    private final EdgeEventRepository edgeEventRepository;
 | 
			
		||||
 | 
			
		||||
    private final EdgeEventInsertRepository edgeEventInsertRepository;
 | 
			
		||||
 | 
			
		||||
    private final SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
 | 
			
		||||
    private final JdbcTemplate jdbcTemplate;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.edge_events.batch_size:1000}")
 | 
			
		||||
    private int batchSize;
 | 
			
		||||
@ -74,14 +80,16 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
 | 
			
		||||
    @Value("${sql.edge_events.stats_print_interval_ms:10000}")
 | 
			
		||||
    private long statsPrintIntervalMs;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.edge_events.partitions_size:168}")
 | 
			
		||||
    private int partitionSizeInHours;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
 | 
			
		||||
    private long edge_events_ttl;
 | 
			
		||||
 | 
			
		||||
    private static final String TABLE_NAME = ModelConstants.EDGE_EVENT_COLUMN_FAMILY_NAME;
 | 
			
		||||
 | 
			
		||||
    private TbSqlBlockingQueueWrapper<EdgeEventEntity> queue;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdgeEventRepository edgeEventRepository;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private EdgeEventInsertRepository edgeEventInsertRepository;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected Class<EdgeEventEntity> getEntityClass() {
 | 
			
		||||
        return EdgeEventEntity.class;
 | 
			
		||||
@ -140,6 +148,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
 | 
			
		||||
        if (StringUtils.isEmpty(edgeEvent.getUid())) {
 | 
			
		||||
            edgeEvent.setUid(edgeEvent.getId().toString());
 | 
			
		||||
        }
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(TABLE_NAME, edgeEvent.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        return save(new EdgeEventEntity(edgeEvent));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -189,20 +198,36 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void cleanupEvents(long ttl) {
 | 
			
		||||
        log.info("Going to cleanup old edge events using ttl: {}s", ttl);
 | 
			
		||||
        try (Connection connection = dataSource.getConnection();
 | 
			
		||||
             PreparedStatement stmt = connection.prepareStatement("call cleanup_edge_events_by_ttl(?,?)")) {
 | 
			
		||||
            stmt.setLong(1, ttl);
 | 
			
		||||
            stmt.setLong(2, 0);
 | 
			
		||||
            stmt.setQueryTimeout((int) TimeUnit.HOURS.toSeconds(1));
 | 
			
		||||
            stmt.execute();
 | 
			
		||||
            printWarnings(stmt);
 | 
			
		||||
            try (ResultSet resultSet = stmt.getResultSet()) {
 | 
			
		||||
                resultSet.next();
 | 
			
		||||
                log.info("Total edge events removed by TTL: [{}]", resultSet.getLong(1));
 | 
			
		||||
            }
 | 
			
		||||
        } catch (SQLException e) {
 | 
			
		||||
            log.error("SQLException occurred during edge events TTL task execution ", e);
 | 
			
		||||
        partitioningRepository.dropPartitionsBefore(TABLE_NAME, ttl, TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void migrateEdgeEvents() {
 | 
			
		||||
        long startTime = edge_events_ttl > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(edge_events_ttl) : 1629158400000L;
 | 
			
		||||
 | 
			
		||||
        long currentTime = System.currentTimeMillis();
 | 
			
		||||
        var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
 | 
			
		||||
        long numberOfPartitions = (currentTime - startTime) / partitionStepInMs;
 | 
			
		||||
 | 
			
		||||
        if (numberOfPartitions > 1000) {
 | 
			
		||||
            String error = "Please adjust your edge event partitioning configuration. Configuration with partition size " +
 | 
			
		||||
                    "of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " +
 | 
			
		||||
                    "(> 1000) partitions which is not recommended!";
 | 
			
		||||
            log.error(error);
 | 
			
		||||
            throw new RuntimeException(error);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        while (startTime < currentTime) {
 | 
			
		||||
            var endTime = startTime + partitionStepInMs;
 | 
			
		||||
            log.info("Migrating edge event for time period: {} - {}", startTime, endTime);
 | 
			
		||||
            callMigrationFunction(startTime, endTime, partitionStepInMs);
 | 
			
		||||
            startTime = endTime;
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Event edge migration finished");
 | 
			
		||||
        jdbcTemplate.execute("DROP TABLE IF EXISTS old_edge_event");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void callMigrationFunction(long startTime, long endTime, long partitionSIzeInMs) {
 | 
			
		||||
        jdbcTemplate.update("CALL migrate_edge_event(?, ?, ?)", startTime, endTime, partitionSIzeInMs);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -50,6 +50,8 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);
 | 
			
		||||
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_device_external_id ON device(tenant_id, external_id);
 | 
			
		||||
 | 
			
		||||
@ -719,7 +719,7 @@ CREATE TABLE IF NOT EXISTS edge (
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
CREATE TABLE IF NOT EXISTS edge_event (
 | 
			
		||||
    id uuid NOT NULL CONSTRAINT edge_event_pkey PRIMARY KEY,
 | 
			
		||||
    id uuid NOT NULL,
 | 
			
		||||
    created_time bigint NOT NULL,
 | 
			
		||||
    edge_id uuid,
 | 
			
		||||
    edge_event_type varchar(255),
 | 
			
		||||
@ -729,7 +729,7 @@ CREATE TABLE IF NOT EXISTS edge_event (
 | 
			
		||||
    body varchar(10000000),
 | 
			
		||||
    tenant_id uuid,
 | 
			
		||||
    ts bigint NOT NULL
 | 
			
		||||
);
 | 
			
		||||
) PARTITION BY RANGE(created_time);
 | 
			
		||||
 | 
			
		||||
CREATE TABLE IF NOT EXISTS rpc (
 | 
			
		||||
    id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user