Automatic migration of the events during upgrade
This commit is contained in:
parent
a339fe88a6
commit
07de58fe21
@ -224,6 +224,7 @@ public class ThingsboardInstallService {
|
||||
case "3.4.0":
|
||||
log.info("Upgrading ThingsBoard from version 3.4.0 to 3.4.1 ...");
|
||||
databaseEntitiesUpgradeService.upgradeDatabase("3.4.0");
|
||||
dataUpdateService.updateData("3.4.0");
|
||||
log.info("Updating system data...");
|
||||
systemDataLoaderService.updateSystemWidgets();
|
||||
break;
|
||||
|
||||
@ -25,6 +25,7 @@ import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.id.QueueId;
|
||||
@ -678,5 +679,4 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -58,6 +58,7 @@ import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.alarm.AlarmDao;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.event.EventService;
|
||||
import org.thingsboard.server.dao.model.sql.DeviceProfileEntity;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.dao.relation.RelationService;
|
||||
@ -128,6 +129,9 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
@Autowired
|
||||
private SystemDataLoaderService systemDataLoaderService;
|
||||
|
||||
@Autowired
|
||||
private EventService eventService;
|
||||
|
||||
@Override
|
||||
public void updateData(String fromVersion) throws Exception {
|
||||
switch (fromVersion) {
|
||||
@ -159,6 +163,11 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
tenantsProfileQueueConfigurationUpdater.updateEntities();
|
||||
rateLimitsUpdater.updateEntities();
|
||||
break;
|
||||
case "3.4.0":
|
||||
if (System.getProperty("TB_EVENTS_MIGRATION", "false").equalsIgnoreCase("true")) {
|
||||
log.info("Updating data from version 3.3.4 to 3.4.0 ...");
|
||||
eventService.migrateEvents();
|
||||
}
|
||||
default:
|
||||
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
|
||||
}
|
||||
@ -602,7 +611,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to update tenant profile queue configuration name=["+profile.getName()+"], id=["+ profile.getId().getId() +"]", e);
|
||||
log.error("Failed to update tenant profile queue configuration name=[" + profile.getName() + "], id=[" + profile.getId().getId() + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -43,4 +43,5 @@ public interface EventService {
|
||||
|
||||
void cleanupEvents(long regularEventExpTs, long debugEventExpTs, boolean cleanupDb);
|
||||
|
||||
void migrateEvents();
|
||||
}
|
||||
|
||||
@ -45,6 +45,11 @@ import java.util.stream.Collectors;
|
||||
@Slf4j
|
||||
public class BaseEventService implements EventService {
|
||||
|
||||
@Value("${sql.ttl.events.events_ttl:0}")
|
||||
private long ttlInSec;
|
||||
@Value("${sql.ttl.events.debug_events_ttl:604800}")
|
||||
private long debugTtlInSec;
|
||||
|
||||
@Value("${event.debug.max-symbols:4096}")
|
||||
private int maxDebugEventSymbols;
|
||||
|
||||
@ -129,6 +134,11 @@ public class BaseEventService implements EventService {
|
||||
eventDao.cleanupEvents(regularEventExpTs, debugEventExpTs, cleanupDb);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrateEvents() {
|
||||
eventDao.migrateEvents(ttlInSec > 0 ? System.currentTimeMillis() - ttlInSec : 0, debugTtlInSec > 0 ? System.currentTimeMillis() - debugTtlInSec : 0);
|
||||
}
|
||||
|
||||
private PageData<EventInfo> convert(EntityType entityType, PageData<? extends Event> pd) {
|
||||
return new PageData<>(pd.getData() == null ? null :
|
||||
pd.getData().stream().map(e -> e.toInfo(entityType)).collect(Collectors.toList())
|
||||
|
||||
@ -91,4 +91,6 @@ public interface EventDao {
|
||||
* @param endTime
|
||||
*/
|
||||
void removeEvents(UUID tenantId, UUID entityId, EventFilter eventFilter, Long startTime, Long endTime);
|
||||
|
||||
void migrateEvents(long regularEventTs, long debugEventTs);
|
||||
}
|
||||
|
||||
@ -19,4 +19,5 @@ public interface EventCleanupRepository {
|
||||
|
||||
void cleanupEvents(long eventExpTime, boolean debug);
|
||||
|
||||
void migrateEvents(long regularEventTs, long debugEventTs);
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.event;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.event.EventType;
|
||||
@ -25,8 +26,10 @@ import java.util.concurrent.TimeUnit;
|
||||
@Component
|
||||
public class EventPartitionConfiguration {
|
||||
|
||||
@Getter
|
||||
@Value("${sql.events.partition_size:168}")
|
||||
private int regularPartitionSizeInHours;
|
||||
@Getter
|
||||
@Value("${sql.events.debug_partition_size:1}")
|
||||
private int debugPartitionSizeInHours;
|
||||
|
||||
|
||||
@ -261,6 +261,10 @@ public class JpaBaseEventDao implements EventDao {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrateEvents(long regularEventTs, long debugEventTs) {
|
||||
eventCleanupRepository.migrateEvents(regularEventTs, debugEventTs);
|
||||
}
|
||||
|
||||
private PageData<? extends Event> findEventByFilter(UUID tenantId, UUID entityId, RuleChainDebugEventFilter eventFilter, TimePageLink pageLink) {
|
||||
return DaoUtil.toPageData(
|
||||
|
||||
@ -50,6 +50,23 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void migrateEvents(long regularEventTs, long debugEventTs) {
|
||||
callMigrateFunction("migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours());
|
||||
callMigrateFunction("migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours());
|
||||
}
|
||||
|
||||
private void callMigrateFunction(String functionName, long startTs, int partitionSizeInHours) {
|
||||
try (Connection connection = dataSource.getConnection();
|
||||
PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?)")) {
|
||||
stmt.setLong(1, startTs);
|
||||
stmt.setInt(2, partitionSizeInHours);
|
||||
stmt.execute();
|
||||
} catch (SQLException e) {
|
||||
log.error("[{}] SQLException occurred during execution of {} with parameters {} and {}", functionName, startTs, partitionSizeInHours, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupEvents(EventType eventType, long eventExpTime) {
|
||||
var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType);
|
||||
List<Long> partitions = fetchPartitions(eventType);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user