Merge pull request #7704 from ViacheslavKlimov/fix/concurrent-partition-creation-errors

[3.4.2] Fix issues with partition creation
This commit is contained in:
Andrew Shvayka 2022-11-30 14:33:03 +02:00 committed by GitHub
commit dfdb2e5bf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 24 deletions

View File

@ -88,6 +88,7 @@ import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest;
import org.thingsboard.server.service.security.auth.rest.LoginRequest; import org.thingsboard.server.service.security.auth.rest.LoginRequest;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -768,4 +769,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
throw new AssertionError("Unexpected status " + mvcResult.getResponse().getStatus()); throw new AssertionError("Unexpected status " + mvcResult.getResponse().getStatus());
} }
protected <T> T getFieldValue(Object target, String fieldName) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(target);
}
} }

View File

@ -44,6 +44,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -202,6 +203,21 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
}); });
} }
@Test
public void whenSavingAuditLogAndPartitionSaveErrorOccurred_thenSaveAuditLogAnyway() throws Exception {
// creating partition bigger than sql.audit_logs.partition_size
partitioningRepository.createPartitionIfNotExists("audit_log", System.currentTimeMillis(), TimeUnit.DAYS.toMillis(7));
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
assertThat(partitions).size().isOne();
partitioningRepository.cleanupPartitionsCache("audit_log", System.currentTimeMillis(), 0);
assertDoesNotThrow(() -> {
// expecting partition overlap error on partition save
createAuditLog(ActionType.LOGIN, tenantAdminUserId);
});
assertThat(partitioningRepository.fetchPartitions("audit_log")).isEqualTo(partitions);
}
private AuditLog createAuditLog(ActionType actionType, EntityId entityId) { private AuditLog createAuditLog(ActionType actionType, EntityId entityId) {
AuditLog auditLog = new AuditLog(); AuditLog auditLog = new AuditLog();
auditLog.setTenantId(tenantId); auditLog.setTenantId(tenantId);

View File

@ -31,7 +31,6 @@ import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.service.DaoSqlTest;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -217,10 +216,4 @@ class TbelInvokeServiceTest extends AbstractControllerTest {
return invokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, msg, "{}", "POST_TELEMETRY_REQUEST").get().toString(); return invokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, msg, "{}", "POST_TELEMETRY_REQUEST").get().toString();
} }
private <T> T getFieldValue(Object target, String fieldName) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(target);
}
} }

View File

@ -16,16 +16,16 @@
package org.thingsboard.server.dao.sqlts.insert.sql; package org.thingsboard.server.dao.sqlts.insert.sql;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException; import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.timeseries.SqlPartition; import org.thingsboard.server.dao.timeseries.SqlPartition;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -36,9 +36,6 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
public class SqlPartitioningRepository { public class SqlPartitioningRepository {
@PersistenceContext
private EntityManager entityManager;
@Autowired @Autowired
private JdbcTemplate jdbcTemplate; private JdbcTemplate jdbcTemplate;
@ -50,12 +47,12 @@ public class SqlPartitioningRepository {
private final Map<String, Map<Long, SqlPartition>> tablesPartitions = new ConcurrentHashMap<>(); private final Map<String, Map<Long, SqlPartition>> tablesPartitions = new ConcurrentHashMap<>();
private final ReentrantLock partitionCreationLock = new ReentrantLock(); private final ReentrantLock partitionCreationLock = new ReentrantLock();
@Transactional @Transactional(propagation = Propagation.NOT_SUPPORTED)
public void save(SqlPartition partition) { public void save(SqlPartition partition) {
entityManager.createNativeQuery(partition.getQuery()).executeUpdate(); jdbcTemplate.execute(partition.getQuery());
} }
@Transactional @Transactional(propagation = Propagation.NOT_SUPPORTED) // executing non-transactionally, so that parent transaction is not aborted on partition save error
public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) { public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) {
long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs); long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs);
Map<Long, SqlPartition> partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>()); Map<Long, SqlPartition> partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>());
@ -64,20 +61,16 @@ public class SqlPartitioningRepository {
partitionCreationLock.lock(); partitionCreationLock.lock();
try { try {
if (partitions.containsKey(partitionStartTs)) return; if (partitions.containsKey(partitionStartTs)) return;
log.trace("Saving partition: {}", partition); log.info("Saving partition {}-{} for table {}", partition.getStart(), partition.getEnd(), table);
save(partition); save(partition);
log.trace("Adding partition to map: {}", partition); log.trace("Adding partition to map: {}", partition);
partitions.put(partition.getStart(), partition); partitions.put(partition.getStart(), partition);
} catch (RuntimeException e) { } catch (Exception e) {
log.trace("Error occurred during partition save:", e); String error = ExceptionUtils.getRootCauseMessage(e);
String msg = ExceptionUtils.getRootCauseMessage(e); if (StringUtils.containsAny(error, "would overlap partition", "already exists")) {
if (msg.contains("would overlap partition")) {
log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}",
partition.getPartitionDate(), table, msg);
partitions.put(partition.getStart(), partition); partitions.put(partition.getStart(), partition);
} else {
throw e;
} }
log.warn("Couldn't save partition {}-{} for table {}: {}", partition.getStart(), partition.getEnd(), table, error);
} finally { } finally {
partitionCreationLock.unlock(); partitionCreationLock.unlock();
} }