Support for bulk delete of events

This commit is contained in:
Andrii Shvaika 2022-07-27 15:30:08 +03:00
parent 6a4140bd28
commit 3af07efd7f
13 changed files with 435 additions and 179 deletions

View File

@ -259,6 +259,8 @@ sql:
batch_max_delay: "${SQL_EVENTS_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_EVENTS_BATCH_STATS_PRINT_MS:10000}"
batch_threads: "${SQL_EVENTS_BATCH_THREADS:3}" # batch thread count have to be a prime number like 3 or 5 to gain perfect hash distribution
partition_size: "${SQL_EVENTS_REGULAR_PARTITION_SIZE_HOURS:168}" # Number of hours to partition the events. The current value corresponds to one week.
debug_partition_size: "${SQL_EVENTS_REGULAR_PARTITION_SIZE_HOURS:1}" # Number of hours to partition the debug events. The current value corresponds to one hour.
edge_events:
batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
@ -285,8 +287,10 @@ sql:
events:
enabled: "${SQL_TTL_EVENTS_ENABLED:true}"
execution_interval_ms: "${SQL_TTL_EVENTS_EXECUTION_INTERVAL:3600000}" # Number of milliseconds (max random initial delay and fixed period).
events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}" # Number of seconds
debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}" # Number of seconds. The current value corresponds to one week
# Number of seconds. TTL is disabled by default. Accuracy of the cleanup depends on the sql.events.partition_size parameter.
events_ttl: "${SQL_TTL_EVENTS_EVENTS_TTL:0}"
# Number of seconds. The current value corresponds to one week. Accuracy of the cleanup depends on the sql.events.debug_partition_size parameter.
debug_events_ttl: "${SQL_TTL_EVENTS_DEBUG_EVENTS_TTL:604800}"
edge_events:
enabled: "${SQL_TTL_EDGE_EVENTS_ENABLED:true}"
execution_interval_ms: "${SQL_TTL_EDGE_EVENTS_EXECUTION_INTERVAL:86400000}" # Number of milliseconds. The current value corresponds to one day

View File

@ -27,9 +27,13 @@ public class StatisticsEventFilter implements EventFilter {
@ApiModelProperty(position = 1, value = "String value representing the server name, identifier or ip address where the platform is running", example = "ip-172-31-24-152")
protected String server;
@ApiModelProperty(position = 2, value = "The minimum number of successfully processed messages", example = "25")
protected Integer messagesProcessed;
@ApiModelProperty(position = 3, value = "The minimum number of errors occurred during messages processing", example = "30")
protected Integer errorsOccurred;
protected Integer minMessagesProcessed;
@ApiModelProperty(position = 3, value = "The maximum number of successfully processed messages", example = "250")
protected Integer maxMessagesProcessed;
@ApiModelProperty(position = 4, value = "The minimum number of errors occurred during messages processing", example = "30")
protected Integer minErrorsOccurred;
@ApiModelProperty(position = 5, value = "The maximum number of errors occurred during messages processing", example = "300")
protected Integer maxErrorsOccurred;
@Override
public EventType getEventType() {
@ -38,6 +42,8 @@ public class StatisticsEventFilter implements EventFilter {
@Override
public boolean isNotEmpty() {
return !StringUtils.isEmpty(server) || (messagesProcessed != null && messagesProcessed > 0) || (errorsOccurred != null && errorsOccurred > 0);
return !StringUtils.isEmpty(server)
|| (minMessagesProcessed != null && minMessagesProcessed > 0) || (minErrorsOccurred != null && minErrorsOccurred > 0)
|| (maxMessagesProcessed != null && maxMessagesProcessed > 0) || (maxErrorsOccurred != null && maxErrorsOccurred > 0);
}
}

View File

@ -117,19 +117,11 @@ public class BaseEventService implements EventService {
@Override
public void removeEvents(TenantId tenantId, EntityId entityId, EventFilter eventFilter, Long startTime, Long endTime) {
// TimePageLink eventsPageLink = new TimePageLink(1000, 0, null, null, startTime, endTime);
// PageData<EventInfo> eventsPageData;
// do {
// if (eventFilter == null) {
// eventsPageData = findEvents(tenantId, entityId, eventsPageLink);
// } else {
// eventsPageData = findEventsByFilter(tenantId, entityId, eventFilter, eventsPageLink);
// }
//
// eventDao.removeAllByIds(eventsPageData.getData().stream()
// .map(IdBased::getUuidId)
// .collect(Collectors.toList()));
// } while (eventsPageData.hasNext());
if (eventFilter == null) {
eventDao.removeEvents(tenantId.getId(), entityId.getId(), startTime, endTime);
} else {
eventDao.removeEvents(tenantId.getId(), entityId.getId(), eventFilter, startTime, endTime);
}
}
@Override

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.event.Event;
import org.thingsboard.server.common.data.event.EventFilter;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.Dao;
@ -71,4 +72,26 @@ public interface EventDao {
* @param debugEventExpTs the expiration time of the debug events
*/
void cleanupEvents(long regularEventExpTs, long debugEventExpTs);
/**
* Removes all events for the specified entity and time interval
*
* @param tenantId
* @param entityId
* @param startTime
* @param endTime
*/
void removeEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime);
/**
*
* Removes all events for the specified entity, event filter and time interval
*
* @param tenantId
* @param entityId
* @param eventFilter
* @param startTime
* @param endTime
*/
void removeEvents(UUID tenantId, UUID entityId, EventFilter eventFilter, Long startTime, Long endTime);
}

View File

@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.dao.model.sql.ErrorEventEntity;
@ -77,4 +79,36 @@ public interface ErrorEventRepository extends EventRepository<ErrorEventEntity,
@Param("error") String error,
Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM ErrorEventEntity e WHERE " +
"e.tenantId = :tenantId " +
"AND e.entityId = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime)"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime);
@Transactional
@Modifying
@Query(nativeQuery = true,
value = "DELETE FROM error_event e WHERE " +
"e.tenant_id = :tenantId " +
"AND e.entity_id = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:method IS NULL OR e.e_method ILIKE concat('%', :method, '%')) " +
"AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("method") String method,
@Param("error") String error);
}

View File

@ -0,0 +1,45 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.event;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.event.EventType;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
@Component
public class EventPartitionConfiguration {
@Value("${sql.events.partition_size:168}")
private int regularPartitionSizeInHours;
@Value("${sql.events.debug_partition_size:1}")
private int debugPartitionSizeInHours;
private long regularPartitionSizeInMs;
private long debugPartitionSizeInMs;
@PostConstruct
public void init() {
regularPartitionSizeInMs = TimeUnit.HOURS.toMillis(regularPartitionSizeInHours);
debugPartitionSizeInMs = TimeUnit.HOURS.toMillis(debugPartitionSizeInHours);
}
public long getPartitionSizeInMs(EventType eventType) {
return eventType.isDebug() ? debugPartitionSizeInMs : regularPartitionSizeInMs;
}
}

View File

@ -28,137 +28,12 @@ import org.thingsboard.server.dao.model.sql.EventEntity;
import java.util.List;
import java.util.UUID;
/**
* Created by Valerii Sosliuk on 5/3/2017.
*/
public interface EventRepository<T extends EventEntity<V>, V extends Event> {
List<T> findLatestEvents(UUID tenantId, UUID entityId, int limit);
Page<T> findEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime, Pageable pageable);
//
// @Query(nativeQuery = true,
// value = "SELECT e.id, e.created_time, e.body, e.entity_id, e.entity_type, e.event_type, e.event_uid, e.tenant_id, ts FROM " +
// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " +
// "e.tenant_id = :tenantId " +
// "AND e.entity_type = :entityType " +
// "AND e.entity_id = :entityId " +
// "AND e.event_type = :eventType " +
// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " +
// ") AS e WHERE " +
// "(:type IS NULL OR lower(json_body->>'type') LIKE concat('%', lower(:type\\:\\:varchar), '%')) " +
// "AND (:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " +
// "AND (:entityName IS NULL OR lower(json_body->>'entityName') LIKE concat('%', lower(:entityName\\:\\:varchar), '%')) " +
// "AND (:relationType IS NULL OR lower(json_body->>'relationType') LIKE concat('%', lower(:relationType\\:\\:varchar), '%')) " +
// "AND (:bodyEntityId IS NULL OR lower(json_body->>'entityId') LIKE concat('%', lower(:bodyEntityId\\:\\:varchar), '%')) " +
// "AND (:msgType IS NULL OR lower(json_body->>'msgType') LIKE concat('%', lower(:msgType\\:\\:varchar), '%')) " +
// "AND ((:isError = FALSE) OR (json_body->>'error') IS NOT NULL) " +
// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%')) " +
// "AND (:data IS NULL OR lower(json_body->>'data') LIKE concat('%', lower(:data\\:\\:varchar), '%')) " +
// "AND (:metadata IS NULL OR lower(json_body->>'metadata') LIKE concat('%', lower(:metadata\\:\\:varchar), '%')) ",
// countQuery = "SELECT count(*) FROM " +
// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " +
// "e.tenant_id = :tenantId " +
// "AND e.entity_type = :entityType " +
// "AND e.entity_id = :entityId " +
// "AND e.event_type = :eventType " +
// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " +
// ") AS e WHERE " +
// "(:type IS NULL OR lower(json_body->>'type') LIKE concat('%', lower(:type\\:\\:varchar), '%')) " +
// "AND (:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " +
// "AND (:entityName IS NULL OR lower(json_body->>'entityName') LIKE concat('%', lower(:entityName\\:\\:varchar), '%')) " +
// "AND (:relationType IS NULL OR lower(json_body->>'relationType') LIKE concat('%', lower(:relationType\\:\\:varchar), '%')) " +
// "AND (:bodyEntityId IS NULL OR lower(json_body->>'entityId') LIKE concat('%', lower(:bodyEntityId\\:\\:varchar), '%')) " +
// "AND (:msgType IS NULL OR lower(json_body->>'msgType') LIKE concat('%', lower(:msgType\\:\\:varchar), '%')) " +
// "AND ((:isError = FALSE) OR (json_body->>'error') IS NOT NULL) " +
// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%')) " +
// "AND (:data IS NULL OR lower(json_body->>'data') LIKE concat('%', lower(:data\\:\\:varchar), '%')) " +
// "AND (:metadata IS NULL OR lower(json_body->>'metadata') LIKE concat('%', lower(:metadata\\:\\:varchar), '%'))"
// )
// Page<EventEntity> findDebugRuleNodeEvents(@Param("tenantId") UUID tenantId,
// @Param("entityId") UUID entityId,
// @Param("entityType") String entityType,
// @Param("eventType") String eventType,
// @Param("startTime") Long startTime,
// @Param("endTime") Long endTime,
// @Param("type") String type,
// @Param("server") String server,
// @Param("entityName") String entityName,
// @Param("relationType") String relationType,
// @Param("bodyEntityId") String bodyEntityId,
// @Param("msgType") String msgType,
// @Param("isError") boolean isError,
// @Param("error") String error,
// @Param("data") String data,
// @Param("metadata") String metadata,
// Pageable pageable);
//
// @Query(nativeQuery = true,
// value = "SELECT e.id, e.created_time, e.body, e.entity_id, e.entity_type, e.event_type, e.event_uid, e.tenant_id, ts FROM " +
// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " +
// "e.tenant_id = :tenantId " +
// "AND e.entity_type = :entityType " +
// "AND e.entity_id = :entityId " +
// "AND e.event_type = 'ERROR' " +
// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " +
// ") AS e WHERE " +
// "(:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " +
// "AND (:method IS NULL OR lower(json_body->>'method') LIKE concat('%', lower(:method\\:\\:varchar), '%')) " +
// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%'))",
// countQuery = "SELECT count(*) FROM " +
// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " +
// "e.tenant_id = :tenantId " +
// "AND e.entity_type = :entityType " +
// "AND e.entity_id = :entityId " +
// "AND e.event_type = 'ERROR' " +
// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " +
// ") AS e WHERE " +
// "(:server IS NULL OR lower(json_body->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " +
// "AND (:method IS NULL OR lower(json_body->>'method') LIKE concat('%', lower(:method\\:\\:varchar), '%')) " +
// "AND (:error IS NULL OR lower(json_body->>'error') LIKE concat('%', lower(:error\\:\\:varchar), '%'))")
// Page<EventEntity> findErrorEvents(@Param("tenantId") UUID tenantId,
// @Param("entityId") UUID entityId,
// @Param("entityType") String entityType,
// @Param("startTime") Long startTime,
// @Param("endTime") Long endTIme,
// @Param("server") String server,
// @Param("method") String method,
// @Param("error") String error,
// Pageable pageable);
//
//
// @Query(nativeQuery = true,
// value = "SELECT e.id, e.created_time, e.body, e.entity_id, e.entity_type, e.event_type, e.event_uid, e.tenant_id, ts FROM " +
// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " +
// "e.tenant_id = :tenantId " +
// "AND e.entity_type = :entityType " +
// "AND e.entity_id = :entityId " +
// "AND e.event_type = 'STATS' " +
// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " +
// ") AS e WHERE " +
// "(:server IS NULL OR lower(e.body\\:\\:json->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " +
// "AND (:messagesProcessed = 0 OR (json_body->>'messagesProcessed')\\:\\:integer >= :messagesProcessed) " +
// "AND (:errorsOccurred = 0 OR (json_body->>'errorsOccurred')\\:\\:integer >= :errorsOccurred) ",
// countQuery = "SELECT count(*) FROM " +
// "(SELECT *, e.body\\:\\:jsonb as json_body FROM event e WHERE " +
// "e.tenant_id = :tenantId " +
// "AND e.entity_type = :entityType " +
// "AND e.entity_id = :entityId " +
// "AND e.event_type = 'LC_EVENT' " +
// "AND e.created_time >= :startTime AND (:endTime = 0 OR e.created_time <= :endTime) " +
// ") AS e WHERE " +
// "(:server IS NULL OR lower(e.body\\:\\:json->>'server') LIKE concat('%', lower(:server\\:\\:varchar), '%')) " +
// "AND (:messagesProcessed = 0 OR (json_body->>'messagesProcessed')\\:\\:integer >= :messagesProcessed) " +
// "AND (:errorsOccurred = 0 OR (json_body->>'errorsOccurred')\\:\\:integer >= :errorsOccurred) ")
// Page<EventEntity> findStatisticsEvents(@Param("tenantId") UUID tenantId,
// @Param("entityId") UUID entityId,
// @Param("entityType") String entityType,
// @Param("startTime") Long startTime,
// @Param("endTime") Long endTIme,
// @Param("server") String server,
// @Param("messagesProcessed") Integer messagesProcessed,
// @Param("errorsOccurred") Integer errorsOccurred,
// Pageable pageable);
//
void removeEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime);
}

View File

@ -48,13 +48,11 @@ import org.thingsboard.server.dao.timeseries.SqlPartition;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -65,12 +63,12 @@ import java.util.function.Function;
@Component
public class JpaBaseEventDao implements EventDao {
public static final long REGULAR_PARTITION_DURATION = TimeUnit.DAYS.toMillis(1);
public static final long DEBUG_PARTITION_DURATION = TimeUnit.HOURS.toMillis(1);
private final Map<EventType, Map<Long, SqlPartition>> partitionsByEventType = new ConcurrentHashMap<>();
private static final ReentrantLock partitionCreationLock = new ReentrantLock();
@Autowired
private EventPartitionConfiguration partitionConfiguration;
@Autowired
private SqlPartitioningRepository partitioningRepository;
@ -170,12 +168,12 @@ public class JpaBaseEventDao implements EventDao {
}
private void savePartitionIfNotExist(Event event) {
var partitionsMap = partitionsByEventType.get(event.getType());
var partitionDuration = event.getType().isDebug() ? DEBUG_PARTITION_DURATION : REGULAR_PARTITION_DURATION;
EventType type = event.getType();
var partitionsMap = partitionsByEventType.get(type);
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type);
long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration);
if (partitionsMap.get(partitionStartTs) == null) {
savePartition(partitionsMap, new SqlPartition(event.getType().getTable(), partitionStartTs,
partitionStartTs + partitionDuration, Long.toString(partitionStartTs)));
savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs)));
}
}
@ -228,6 +226,42 @@ public class JpaBaseEventDao implements EventDao {
}
}
@Override
public void removeEvents(UUID tenantId, UUID entityId, Long startTime, Long endTime) {
log.debug("[{}][{}] Remove events [{}-{}] ", tenantId, entityId, startTime, endTime);
for (EventType eventType : EventType.values()) {
getEventRepository(eventType).removeEvents(tenantId, entityId, startTime, endTime);
}
}
@Override
public void removeEvents(UUID tenantId, UUID entityId, EventFilter eventFilter, Long startTime, Long endTime) {
if (eventFilter.isNotEmpty()) {
switch (eventFilter.getEventType()) {
case DEBUG_RULE_NODE:
removeEventsByFilter(tenantId, entityId, (RuleNodeDebugEventFilter) eventFilter, startTime, endTime);
break;
case DEBUG_RULE_CHAIN:
removeEventsByFilter(tenantId, entityId, (RuleChainDebugEventFilter) eventFilter, startTime, endTime);
break;
case LC_EVENT:
removeEventsByFilter(tenantId, entityId, (LifeCycleEventFilter) eventFilter, startTime, endTime);
break;
case ERROR:
removeEventsByFilter(tenantId, entityId, (ErrorEventFilter) eventFilter, startTime, endTime);
break;
case STATS:
removeEventsByFilter(tenantId, entityId, (StatisticsEventFilter) eventFilter, startTime, endTime);
break;
default:
throw new RuntimeException("Not supported event type: " + eventFilter.getEventType());
}
} else {
getEventRepository(eventFilter.getEventType()).removeEvents(tenantId, entityId, startTime, endTime);
}
}
private PageData<? extends Event> findEventByFilter(UUID tenantId, UUID entityId, RuleChainDebugEventFilter eventFilter, TimePageLink pageLink) {
return DaoUtil.toPageData(
ruleChainDebugEventRepository.findEvents(
@ -305,12 +339,88 @@ public class JpaBaseEventDao implements EventDao {
pageLink.getStartTime(),
pageLink.getEndTime(),
eventFilter.getServer(),
eventFilter.getMessagesProcessed(),
eventFilter.getErrorsOccurred(),
eventFilter.getMinMessagesProcessed(),
eventFilter.getMaxMessagesProcessed(),
eventFilter.getMinErrorsOccurred(),
eventFilter.getMaxErrorsOccurred(),
DaoUtil.toPageable(pageLink))
);
}
private void removeEventsByFilter(UUID tenantId, UUID entityId, RuleChainDebugEventFilter eventFilter, Long startTime, Long endTime) {
ruleChainDebugEventRepository.removeEvents(
tenantId,
entityId,
startTime,
endTime,
eventFilter.getServer(),
eventFilter.getMessage(),
eventFilter.isError(),
eventFilter.getErrorStr());
}
private void removeEventsByFilter(UUID tenantId, UUID entityId, RuleNodeDebugEventFilter eventFilter, Long startTime, Long endTime) {
parseUUID(eventFilter.getEntityId(), "Entity Id");
parseUUID(eventFilter.getMsgId(), "Message Id");
ruleNodeDebugEventRepository.removeEvents(
tenantId,
entityId,
startTime,
endTime,
eventFilter.getServer(),
eventFilter.getMsgDirectionType(),
eventFilter.getEntityId(),
eventFilter.getEntityType(),
eventFilter.getMsgId(),
eventFilter.getMsgType(),
eventFilter.getRelationType(),
eventFilter.getDataSearch(),
eventFilter.getMetadataSearch(),
eventFilter.isError(),
eventFilter.getErrorStr());
}
private void removeEventsByFilter(UUID tenantId, UUID entityId, ErrorEventFilter eventFilter, Long startTime, Long endTime) {
errorEventRepository.removeEvents(
tenantId,
entityId,
startTime,
endTime,
eventFilter.getServer(),
eventFilter.getMethod(),
eventFilter.getErrorStr());
}
private void removeEventsByFilter(UUID tenantId, UUID entityId, LifeCycleEventFilter eventFilter, Long startTime, Long endTime) {
boolean statusFilterEnabled = !StringUtils.isEmpty(eventFilter.getStatus());
boolean statusFilter = statusFilterEnabled && eventFilter.getStatus().equalsIgnoreCase("Success");
lcEventRepository.removeEvents(
tenantId,
entityId,
startTime,
endTime,
eventFilter.getServer(),
eventFilter.getEvent(),
statusFilterEnabled,
statusFilter,
eventFilter.getErrorStr());
}
private void removeEventsByFilter(UUID tenantId, UUID entityId, StatisticsEventFilter eventFilter, Long startTime, Long endTime) {
statsEventRepository.removeEvents(
tenantId,
entityId,
startTime,
endTime,
eventFilter.getServer(),
eventFilter.getMinMessagesProcessed(),
eventFilter.getMaxMessagesProcessed(),
eventFilter.getMinErrorsOccurred(),
eventFilter.getMaxErrorsOccurred()
);
}
@Override
public List<? extends Event> findLatestEvents(UUID tenantId, UUID entityId, EventType eventType, int limit) {
return DaoUtil.convertDataList(getEventRepository(eventType).findLatestEvents(tenantId, entityId, limit));

View File

@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.dao.model.sql.LifecycleEventEntity;
@ -77,4 +79,39 @@ public interface LifecycleEventRepository extends EventRepository<LifecycleEvent
@Param("error") String error,
Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM LifecycleEventEntity e WHERE " +
"e.tenantId = :tenantId " +
"AND e.entityId = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime)"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime);
@Transactional
@Modifying
@Query(nativeQuery = true,
value = "DELETE FROM lc_event e WHERE " +
"e.tenant_id = :tenantId " +
"AND e.entity_id = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:eventType IS NULL OR e.e_type ILIKE concat('%', :eventType, '%')) " +
"AND ((:statusFilterEnabled = FALSE) OR e.e_success = :statusFilter) " +
"AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("eventType") String eventType,
@Param("statusFilterEnabled") boolean statusFilterEnabled,
@Param("statusFilter") boolean statusFilter,
@Param("error") String error);
}

View File

@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.event.RuleChainDebugEvent;
import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
import org.thingsboard.server.dao.model.sql.RuleChainDebugEventEntity;
@ -43,10 +45,10 @@ public interface RuleChainDebugEventRepository extends EventRepository<RuleChain
"AND (:endTime IS NULL OR e.ts <= :endTime)"
)
Page<RuleChainDebugEventEntity> findEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
Pageable pageable);
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
Pageable pageable);
@Query(nativeQuery = true,
value = "SELECT * FROM rule_chain_debug_event e WHERE " +
@ -70,13 +72,46 @@ public interface RuleChainDebugEventRepository extends EventRepository<RuleChain
"AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))"
)
Page<RuleChainDebugEventEntity> findEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("message") String message,
@Param("isError") boolean isError,
@Param("error") String error,
Pageable pageable);
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("message") String message,
@Param("isError") boolean isError,
@Param("error") String error,
Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM RuleChainDebugEventEntity e WHERE " +
"e.tenantId = :tenantId " +
"AND e.entityId = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime)"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime);
@Transactional
@Modifying
@Query(nativeQuery = true,
value = "DELETE FROM rule_chain_debug_event e WHERE " +
"e.tenant_id = :tenantId " +
"AND e.entity_id = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:message IS NULL OR e.e_message ILIKE concat('%', :message, '%')) " +
"AND ((:isError = FALSE) OR e.e_error IS NOT NULL) " +
"AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))")
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("message") String message,
@Param("isError") boolean isError,
@Param("error") String error);
}

View File

@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
import org.thingsboard.server.dao.model.sql.ErrorEventEntity;
@ -100,4 +102,51 @@ public interface RuleNodeDebugEventRepository extends EventRepository<RuleNodeDe
@Param("error") String error,
Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM RuleNodeDebugEventEntity e WHERE " +
"e.tenantId = :tenantId " +
"AND e.entityId = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime)"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime);
@Transactional
@Modifying
@Query(nativeQuery = true,
value = "DELETE FROM rule_node_debug_event e WHERE " +
"e.tenant_id = :tenantId " +
"AND e.entity_id = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:eventType IS NULL OR e.e_type ILIKE concat('%', :eventType, '%')) " +
"AND (:eventEntityId IS NULL OR e.e_entity_id = uuid(:eventEntityId)) " +
"AND (:eventEntityType IS NULL OR e.e_entity_type ILIKE concat('%', :eventEntityType, '%')) " +
"AND (:msgId IS NULL OR e.e_msg_id = uuid(:msgId)) " +
"AND (:msgType IS NULL OR e.e_msg_type ILIKE concat('%', :msgType, '%')) " +
"AND (:relationType IS NULL OR e.e_relation_type ILIKE concat('%', :relationType, '%')) " +
"AND (:data IS NULL OR e.e_data ILIKE concat('%', :data, '%')) " +
"AND (:metadata IS NULL OR e.e_metadata ILIKE concat('%', :metadata, '%')) " +
"AND ((:isError = FALSE) OR e.e_error IS NOT NULL) " +
"AND (:error IS NULL OR e.e_error ILIKE concat('%', :error, '%'))")
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("eventType") String type,
@Param("eventEntityId") String eventEntityId,
@Param("eventEntityType") String eventEntityType,
@Param("msgId") String eventMsgId,
@Param("msgType") String eventMsgType,
@Param("relationType") String relationType,
@Param("data") String data,
@Param("metadata") String metadata,
@Param("isError") boolean isError,
@Param("error") String error);
}

View File

@ -16,7 +16,7 @@
package org.thingsboard.server.dao.sql.event;
import lombok.extern.slf4j.Slf4j;
import org.postgresql.util.PSQLException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.common.data.event.EventType;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
@ -28,8 +28,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import static org.thingsboard.server.dao.sql.event.JpaBaseEventDao.DEBUG_PARTITION_DURATION;
import static org.thingsboard.server.dao.sql.event.JpaBaseEventDao.REGULAR_PARTITION_DURATION;
@Slf4j
@Repository
@ -38,6 +36,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')";
private static final int PSQL_VERSION_14 = 140000;
@Autowired
private EventPartitionConfiguration partitionConfiguration;
private volatile Integer currentServerVersion;
@Override
@ -50,7 +51,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
}
private void cleanupEvents(EventType eventType, long eventExpTime) {
var partitionDuration = eventType.isDebug() ? DEBUG_PARTITION_DURATION : REGULAR_PARTITION_DURATION;
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType);
List<Long> partitions = fetchPartitions(eventType);
for (var partitionTs : partitions) {
var partitionEndTs = partitionTs + partitionDuration;

View File

@ -18,8 +18,10 @@ package org.thingsboard.server.dao.sql.event;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.event.StatisticsEvent;
import org.thingsboard.server.dao.model.sql.StatisticsEventEntity;
@ -51,8 +53,10 @@ public interface StatisticsEventRepository extends EventRepository<StatisticsEve
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:messagesProcessed IS NULL OR e.e_messages_processed >= :messagesProcessed) " +
"AND (:errorsOccurred IS NULL OR e.e_errors_occurred >= :errorsOccurred)"
"AND (:minMessagesProcessed IS NULL OR e.e_messages_processed >= :minMessagesProcessed) " +
"AND (:maxMessagesProcessed IS NULL OR e.e_messages_processed < :maxMessagesProcessed) " +
"AND (:minErrorsOccurred IS NULL OR e.e_errors_occurred >= :minErrorsOccurred) " +
"AND (:maxErrorsOccurred IS NULL OR e.e_errors_occurred < :maxErrorsOccurred)"
,
countQuery = "SELECT count(*) FROM stats_event e WHERE " +
"e.tenant_id = :tenantId " +
@ -60,16 +64,57 @@ public interface StatisticsEventRepository extends EventRepository<StatisticsEve
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:messagesProcessed IS NULL OR e.e_messages_processed >= :messagesProcessed) " +
"AND (:errorsOccurred IS NULL OR e.e_errors_occurred >= :errorsOccurred)"
"AND (:minMessagesProcessed IS NULL OR e.e_messages_processed >= :minMessagesProcessed) " +
"AND (:maxMessagesProcessed IS NULL OR e.e_messages_processed < :maxMessagesProcessed) " +
"AND (:minErrorsOccurred IS NULL OR e.e_errors_occurred >= :minErrorsOccurred) " +
"AND (:maxErrorsOccurred IS NULL OR e.e_errors_occurred < :maxErrorsOccurred)"
)
Page<StatisticsEventEntity> findEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("messagesProcessed") Integer messagesProcessed,
@Param("errorsOccurred") Integer errorsOccurred,
@Param("minMessagesProcessed") Integer minMessagesProcessed,
@Param("maxMessagesProcessed") Integer maxMessagesProcessed,
@Param("minErrorsOccurred") Integer minErrorsOccurred,
@Param("maxErrorsOccurred") Integer maxErrorsOccurred,
Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM StatisticsEventEntity e WHERE " +
"e.tenantId = :tenantId " +
"AND e.entityId = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime)"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime);
@Transactional
@Modifying
@Query(nativeQuery = true,
value = "DELETE FROM stats_event e WHERE " +
"e.tenant_id = :tenantId " +
"AND e.entity_id = :entityId " +
"AND (:startTime IS NULL OR e.ts >= :startTime) " +
"AND (:endTime IS NULL OR e.ts <= :endTime) " +
"AND (:serviceId IS NULL OR e.service_id ILIKE concat('%', :serviceId, '%')) " +
"AND (:minMessagesProcessed IS NULL OR e.e_messages_processed >= :minMessagesProcessed) " +
"AND (:maxMessagesProcessed IS NULL OR e.e_messages_processed < :maxMessagesProcessed) " +
"AND (:minErrorsOccurred IS NULL OR e.e_errors_occurred >= :minErrorsOccurred) " +
"AND (:maxErrorsOccurred IS NULL OR e.e_errors_occurred < :maxErrorsOccurred)"
)
void removeEvents(@Param("tenantId") UUID tenantId,
@Param("entityId") UUID entityId,
@Param("startTime") Long startTime,
@Param("endTime") Long endTime,
@Param("serviceId") String server,
@Param("minMessagesProcessed") Integer minMessagesProcessed,
@Param("maxMessagesProcessed") Integer maxMessagesProcessed,
@Param("minErrorsOccurred") Integer minErrorsOccurred,
@Param("maxErrorsOccurred") Integer maxErrorsOccurred);
}