SQL batch sort enabled by default; additional deadlock logging
This commit is contained in:
parent
d7aa2e5660
commit
35960d018e
@ -268,7 +268,7 @@ sql:
|
|||||||
audit_logs:
|
audit_logs:
|
||||||
partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
|
partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
|
||||||
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
||||||
batch_sort: "${SQL_BATCH_SORT:false}"
|
batch_sort: "${SQL_BATCH_SORT:true}"
|
||||||
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
|
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
|
||||||
remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
|
remove_null_chars: "${SQL_REMOVE_NULL_CHARS:true}"
|
||||||
# Specify whether to log database queries and their parameters generated by entity query repository
|
# Specify whether to log database queries and their parameters generated by entity query repository
|
||||||
|
|||||||
@ -146,6 +146,16 @@ public class StringUtils {
|
|||||||
return org.apache.commons.lang3.StringUtils.substringAfterLast(str, sep);
|
return org.apache.commons.lang3.StringUtils.substringAfterLast(str, sep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean containedByAny(String searchString, String... strings) {
|
||||||
|
if (searchString == null) return false;
|
||||||
|
for (String string : strings) {
|
||||||
|
if (string != null && string.contains(searchString)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
public static String randomNumeric(int length) {
|
public static String randomNumeric(int length) {
|
||||||
return RandomStringUtils.randomNumeric(length);
|
return RandomStringUtils.randomNumeric(length);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,7 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture;
|
|||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.ArrayUtils;
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import org.aspectj.lang.ProceedingJoinPoint;
|
import org.aspectj.lang.ProceedingJoinPoint;
|
||||||
import org.aspectj.lang.annotation.Around;
|
import org.aspectj.lang.annotation.Around;
|
||||||
@ -29,9 +28,11 @@ import org.aspectj.lang.annotation.Aspect;
|
|||||||
import org.aspectj.lang.reflect.MethodSignature;
|
import org.aspectj.lang.reflect.MethodSignature;
|
||||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||||
import org.hibernate.exception.JDBCConnectionException;
|
import org.hibernate.exception.JDBCConnectionException;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -46,6 +47,8 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.commons.lang3.StringUtils.join;
|
||||||
|
|
||||||
@Aspect
|
@Aspect
|
||||||
@ConditionalOnProperty(prefix = "sql", value = "log_tenant_stats", havingValue = "true")
|
@ConditionalOnProperty(prefix = "sql", value = "log_tenant_stats", havingValue = "true")
|
||||||
@Component
|
@Component
|
||||||
@ -55,6 +58,12 @@ public class SqlDaoCallsAspect {
|
|||||||
private final Set<String> invalidTenantDbCallMethods = ConcurrentHashMap.newKeySet();
|
private final Set<String> invalidTenantDbCallMethods = ConcurrentHashMap.newKeySet();
|
||||||
private final ConcurrentMap<TenantId, DbCallStats> statsMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<TenantId, DbCallStats> statsMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Value("${sql.batch_sort:true}")
|
||||||
|
private boolean batchSortEnabled;
|
||||||
|
|
||||||
|
private static final String DEADLOCK_DETECTED_ERROR = "deadlock detected";
|
||||||
|
|
||||||
|
|
||||||
@Scheduled(initialDelayString = "${sql.log_tenant_stats_interval_ms:60000}",
|
@Scheduled(initialDelayString = "${sql.log_tenant_stats_interval_ms:60000}",
|
||||||
fixedDelayString = "${sql.log_tenant_stats_interval_ms:60000}")
|
fixedDelayString = "${sql.log_tenant_stats_interval_ms:60000}")
|
||||||
public void printStats() {
|
public void printStats() {
|
||||||
@ -135,7 +144,7 @@ public class SqlDaoCallsAspect {
|
|||||||
|
|
||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
@Around("@within(org.thingsboard.server.dao.util.SqlDao)")
|
@Around("@within(org.thingsboard.server.dao.util.SqlDao)")
|
||||||
public Object logExecutionTime(ProceedingJoinPoint joinPoint) throws Throwable {
|
public Object handleSqlCall(ProceedingJoinPoint joinPoint) throws Throwable {
|
||||||
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
|
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
|
||||||
var methodName = signature.toShortString();
|
var methodName = signature.toShortString();
|
||||||
if (invalidTenantDbCallMethods.contains(methodName)) {
|
if (invalidTenantDbCallMethods.contains(methodName)) {
|
||||||
@ -155,29 +164,47 @@ public class SqlDaoCallsAspect {
|
|||||||
new FutureCallback<>() {
|
new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Object result) {
|
public void onSuccess(@Nullable Object result) {
|
||||||
logTenantMethodExecution(tenantId, methodName, true, startTime, null);
|
reportSuccessfulMethodExecution(tenantId, methodName, startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
logTenantMethodExecution(tenantId, methodName, false, startTime, t);
|
reportFailedMethodExecution(tenantId, methodName, startTime, t, joinPoint);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
MoreExecutors.directExecutor());
|
MoreExecutors.directExecutor());
|
||||||
} else {
|
} else {
|
||||||
logTenantMethodExecution(tenantId, methodName, true, startTime, null);
|
reportSuccessfulMethodExecution(tenantId, methodName, startTime);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
logTenantMethodExecution(tenantId, methodName, false, startTime, t);
|
reportFailedMethodExecution(tenantId, methodName, startTime, t, joinPoint);
|
||||||
throw t;
|
throw t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logTenantMethodExecution(TenantId tenantId, String method, boolean success, long startTime, Throwable t) {
|
private void reportFailedMethodExecution(TenantId tenantId, String method, long startTime, Throwable t, ProceedingJoinPoint joinPoint) {
|
||||||
if (!success && ExceptionUtils.indexOfThrowable(t, JDBCConnectionException.class) >= 0) {
|
if (t != null) {
|
||||||
|
if (ExceptionUtils.indexOfThrowable(t, JDBCConnectionException.class) >= 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (StringUtils.containedByAny(DEADLOCK_DETECTED_ERROR, ExceptionUtils.getRootCauseMessage(t), ExceptionUtils.getMessage(t))) {
|
||||||
|
if (!batchSortEnabled) {
|
||||||
|
log.warn("Deadlock was detected for method {} (tenant: {}). You might need to enable 'sql.batch_sort' option.", method, tenantId);
|
||||||
|
} else {
|
||||||
|
log.error("Deadlock was detected for method {} (tenant: {}). Arguments passed: \n{}\n The error: ",
|
||||||
|
method, tenantId, join(joinPoint.getArgs(), System.lineSeparator()), t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reportMethodExecution(tenantId, method, false, startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reportSuccessfulMethodExecution(TenantId tenantId, String method, long startTime) {
|
||||||
|
reportMethodExecution(tenantId, method, true, startTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reportMethodExecution(TenantId tenantId, String method, boolean success, long startTime) {
|
||||||
statsMap.computeIfAbsent(tenantId, DbCallStats::new)
|
statsMap.computeIfAbsent(tenantId, DbCallStats::new)
|
||||||
.onMethodCall(method, success, System.currentTimeMillis() - startTime);
|
.onMethodCall(method, success, System.currentTimeMillis() - startTime);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,6 +25,7 @@ import org.springframework.transaction.TransactionStatus;
|
|||||||
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
|
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
|
||||||
|
import org.thingsboard.server.dao.util.SqlDao;
|
||||||
|
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
@ -35,6 +36,7 @@ import java.util.regex.Pattern;
|
|||||||
|
|
||||||
@Repository
|
@Repository
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@SqlDao
|
||||||
public abstract class AttributeKvInsertRepository {
|
public abstract class AttributeKvInsertRepository {
|
||||||
|
|
||||||
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
|
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
|
||||||
@ -58,7 +60,7 @@ public abstract class AttributeKvInsertRepository {
|
|||||||
@Value("${sql.remove_null_chars:true}")
|
@Value("${sql.remove_null_chars:true}")
|
||||||
private boolean removeNullChars;
|
private boolean removeNullChars;
|
||||||
|
|
||||||
protected void saveOrUpdate(List<AttributeKvEntity> entities) {
|
public void saveOrUpdate(List<AttributeKvEntity> entities) {
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
protected void doInTransactionWithoutResult(TransactionStatus status) {
|
||||||
|
|||||||
@ -78,7 +78,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
|
|||||||
@Value("${sql.attributes.batch_threads:4}")
|
@Value("${sql.attributes.batch_threads:4}")
|
||||||
private int batchThreads;
|
private int batchThreads;
|
||||||
|
|
||||||
@Value("${sql.batch_sort:false}")
|
@Value("${sql.batch_sort:true}")
|
||||||
private boolean batchSortEnabled;
|
private boolean batchSortEnabled;
|
||||||
|
|
||||||
private TbSqlBlockingQueueWrapper<AttributeKvEntity> queue;
|
private TbSqlBlockingQueueWrapper<AttributeKvEntity> queue;
|
||||||
|
|||||||
@ -17,9 +17,11 @@ package org.thingsboard.server.dao.sql.attributes;
|
|||||||
|
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
import org.thingsboard.server.dao.util.SqlDao;
|
||||||
|
|
||||||
@Repository
|
@Repository
|
||||||
@Transactional
|
@Transactional
|
||||||
|
@SqlDao
|
||||||
public class SqlAttributesInsertRepository extends AttributeKvInsertRepository {
|
public class SqlAttributesInsertRepository extends AttributeKvInsertRepository {
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.event.LifecycleEvent;
|
|||||||
import org.thingsboard.server.common.data.event.RuleChainDebugEvent;
|
import org.thingsboard.server.common.data.event.RuleChainDebugEvent;
|
||||||
import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
|
import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
|
||||||
import org.thingsboard.server.common.data.event.StatisticsEvent;
|
import org.thingsboard.server.common.data.event.StatisticsEvent;
|
||||||
|
import org.thingsboard.server.dao.util.SqlDao;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
@ -45,6 +46,7 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
@Repository
|
@Repository
|
||||||
@Transactional
|
@Transactional
|
||||||
|
@SqlDao
|
||||||
public class EventInsertRepository {
|
public class EventInsertRepository {
|
||||||
|
|
||||||
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
|
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
|
||||||
@ -81,7 +83,7 @@ public class EventInsertRepository {
|
|||||||
"VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;");
|
"VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING;");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void save(List<Event> entities) {
|
public void save(List<Event> entities) {
|
||||||
Map<EventType, List<Event>> eventsByType = entities.stream().collect(Collectors.groupingBy(Event::getType, Collectors.toList()));
|
Map<EventType, List<Event>> eventsByType = entities.stream().collect(Collectors.groupingBy(Event::getType, Collectors.toList()));
|
||||||
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -106,7 +106,7 @@ public class JpaBaseEventDao implements EventDao {
|
|||||||
@Value("${sql.events.batch_threads:3}")
|
@Value("${sql.events.batch_threads:3}")
|
||||||
private int batchThreads;
|
private int batchThreads;
|
||||||
|
|
||||||
@Value("${sql.batch_sort:false}")
|
@Value("${sql.batch_sort:true}")
|
||||||
private boolean batchSortEnabled;
|
private boolean batchSortEnabled;
|
||||||
|
|
||||||
private TbSqlBlockingQueueWrapper<Event> queue;
|
private TbSqlBlockingQueueWrapper<Event> queue;
|
||||||
|
|||||||
@ -63,7 +63,7 @@ public abstract class AbstractSqlTimeseriesDao extends BaseAbstractSqlTimeseries
|
|||||||
@Value("${sql.timescale.batch_threads:4}")
|
@Value("${sql.timescale.batch_threads:4}")
|
||||||
protected int timescaleBatchThreads;
|
protected int timescaleBatchThreads;
|
||||||
|
|
||||||
@Value("${sql.batch_sort:false}")
|
@Value("${sql.batch_sort:true}")
|
||||||
protected boolean batchSortEnabled;
|
protected boolean batchSortEnabled;
|
||||||
|
|
||||||
@Value("${sql.ttl.ts.ts_key_value_ttl:0}")
|
@Value("${sql.ttl.ts.ts_key_value_ttl:0}")
|
||||||
|
|||||||
@ -94,7 +94,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
@Value("${sql.ts_latest.batch_threads:4}")
|
@Value("${sql.ts_latest.batch_threads:4}")
|
||||||
private int tsLatestBatchThreads;
|
private int tsLatestBatchThreads;
|
||||||
|
|
||||||
@Value("${sql.batch_sort:false}")
|
@Value("${sql.batch_sort:true}")
|
||||||
protected boolean batchSortEnabled;
|
protected boolean batchSortEnabled;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import org.springframework.transaction.support.TransactionCallbackWithoutResult;
|
|||||||
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
|
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
|
||||||
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
|
import org.thingsboard.server.dao.sqlts.insert.AbstractInsertRepository;
|
||||||
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
||||||
|
import org.thingsboard.server.dao.util.SqlDao;
|
||||||
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
|
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
|
||||||
|
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
@ -36,6 +37,7 @@ import java.util.List;
|
|||||||
@SqlTsLatestAnyDao
|
@SqlTsLatestAnyDao
|
||||||
@Repository
|
@Repository
|
||||||
@Transactional
|
@Transactional
|
||||||
|
@SqlDao
|
||||||
public class SqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository {
|
public class SqlLatestInsertTsRepository extends AbstractInsertRepository implements InsertLatestTsRepository {
|
||||||
|
|
||||||
@Value("${sql.ts_latest.update_by_latest_ts:true}")
|
@Value("${sql.ts_latest.update_by_latest_ts:true}")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user