From e6d08e652818842992df8845db5363825146c310 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 29 Jan 2024 11:11:02 +0200 Subject: [PATCH 1/7] 'Create' method for dao (used for audit logs, notifications, alarm comments) --- .../java/org/thingsboard/server/dao/Dao.java | 2 + .../server/dao/sql/JpaAbstractDao.java | 49 +++++++++++++++---- .../dao/sql/alarm/JpaAlarmCommentDao.java | 3 +- .../server/dao/sql/audit/JpaAuditLogDao.java | 3 +- .../sql/notification/JpaNotificationDao.java | 2 +- 5 files changed, 44 insertions(+), 15 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java index 82b5837bcb..e26fdb79ac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java @@ -39,6 +39,8 @@ public interface Dao { T saveAndFlush(TenantId tenantId, T t); + T create(TenantId tenantId, T t); + boolean removeById(TenantId tenantId, UUID id); void removeAllByIds(Collection ids); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java index 30919da561..58c8bffd07 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java @@ -19,14 +19,19 @@ import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.transaction.support.TransactionTemplate; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.BaseEntity; import org.thingsboard.server.dao.util.SqlDao; +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -41,6 +46,12 @@ public abstract class JpaAbstractDao, D> extends JpaAbstractDaoListeningExecutorService implements Dao { + @PersistenceContext + private EntityManager entityManager; + + @Autowired + private TransactionTemplate transactionTemplate; + protected abstract Class getEntityClass(); protected abstract JpaRepository getRepository(); @@ -51,6 +62,33 @@ public abstract class JpaAbstractDao, D> @Override @Transactional public D save(TenantId tenantId, D domain) { + E entity = prepare(domain); + entity = getRepository().save(entity); + return DaoUtil.getData(entity); + } + + @Override + @Transactional + public D saveAndFlush(TenantId tenantId, D domain) { + D d = save(tenantId, domain); + getRepository().flush(); + return d; + } + + @Override + public D create(TenantId tenantId, D domain) { + E entity = prepare(domain); + if (TransactionSynchronizationManager.isActualTransactionActive()) { + entityManager.persist(entity); + } else { + transactionTemplate.executeWithoutResult(ts -> { + entityManager.persist(entity); + }); + } + return DaoUtil.getData(entity); + } + + private E prepare(D domain) { E entity; try { entity = getEntityClass().getConstructor(domain.getClass()).newInstance(domain); @@ -65,16 +103,7 @@ public abstract class JpaAbstractDao, D> entity.setUuid(uuid); entity.setCreatedTime(Uuids.unixTimestamp(uuid)); } - entity = getRepository().save(entity); - return DaoUtil.getData(entity); - } - - @Override - @Transactional - public D saveAndFlush(TenantId tenantId, D domain) { - D d = save(tenantId, domain); - getRepository().flush(); - return d; + return entity; } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java index dc31ed868a..f44049c7a1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java @@ -57,8 +57,7 @@ public class JpaAlarmCommentDao extends JpaAbstractDao imp auditLog.setCreatedTime(Uuids.unixTimestamp(uuid)); } partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - return super.save(tenantId, auditLog); + return create(tenantId, auditLog); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java index 57a8306fc1..16ca94395e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.sql.notification; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; @@ -60,6 +59,7 @@ public class JpaNotificationDao extends JpaAbstractDao Date: Mon, 29 Jan 2024 11:12:51 +0200 Subject: [PATCH 2/7] Proper SQL error instead of 'could not extract ResultSet' --- .../server/controller/BaseController.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index f12a20094d..414d13a714 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -16,14 +16,16 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.postgresql.util.PSQLException; +import org.postgresql.util.ServerErrorMessage; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.dao.DataAccessException; import org.springframework.http.MediaType; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; @@ -46,7 +48,6 @@ import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.OtaPackage; import org.thingsboard.server.common.data.OtaPackageInfo; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; @@ -170,7 +171,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; -import static org.thingsboard.server.common.data.StringUtils.isNotEmpty; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.thingsboard.server.common.data.query.EntityKeyType.ENTITY_FIELD; import static org.thingsboard.server.controller.UserController.YOU_DON_T_HAVE_PERMISSION_TO_PERFORM_THIS_OPERATION; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -376,9 +377,18 @@ public abstract class BaseController { return new ThingsboardException("Unable to send mail: " + exception.getMessage(), ThingsboardErrorCode.GENERAL); } else if (exception instanceof AsyncRequestTimeoutException) { return new ThingsboardException("Request timeout", ThingsboardErrorCode.GENERAL); - } else { - return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL); + } else if (exception instanceof DataAccessException) { + Throwable rootCause = ExceptionUtils.getRootCause(exception); + if (rootCause instanceof PSQLException) { + String sqlError = Optional.ofNullable(((PSQLException) rootCause).getServerErrorMessage()) + .map(ServerErrorMessage::getMessage).orElse(null); + if (isNotEmpty(sqlError)) { + sqlError = StringUtils.capitalize(sqlError); + return new ThingsboardException(sqlError, ThingsboardErrorCode.GENERAL); + } + } } + return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL); } /** From feb55f636a12c840a0cb8e71e8972bd8976be3b5 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 29 Jan 2024 13:28:24 +0200 Subject: [PATCH 3/7] Refactoring for dao save --- .../server/dao/audit/AuditLogService.java | 2 +- .../java/org/thingsboard/server/dao/Dao.java | 2 - .../dao/alarm/BaseAlarmCommentService.java | 8 ---- .../server/dao/audit/AuditLogDao.java | 2 +- .../server/dao/audit/AuditLogServiceImpl.java | 23 ++++------ .../dao/audit/DummyAuditLogServiceImpl.java | 2 +- .../audit/sink/ElasticsearchAuditLogSink.java | 23 ++++++++++ .../server/dao/sql/JpaAbstractDao.java | 45 ++++++++++--------- .../dao/sql/alarm/JpaAlarmCommentDao.java | 7 ++- .../server/dao/sql/audit/JpaAuditLogDao.java | 19 +++----- .../sql/notification/JpaNotificationDao.java | 14 +++--- 11 files changed, 74 insertions(+), 73 deletions(-) diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java index 0545e490da..4004477004 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java @@ -38,7 +38,7 @@ public interface AuditLogService { PageData findAuditLogsByTenantId(TenantId tenantId, List actionTypes, TimePageLink pageLink); - ListenableFuture> logEntityAction( + ListenableFuture logEntityAction( TenantId tenantId, CustomerId customerId, UserId userId, diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java index e26fdb79ac..82b5837bcb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java @@ -39,8 +39,6 @@ public interface Dao { T saveAndFlush(TenantId tenantId, T t); - T create(TenantId tenantId, T t); - boolean removeById(TenantId tenantId, UUID id); void removeAllByIds(Collection ids); diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 218d31dd53..6ded31fde3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.alarm; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.ListenableFuture; @@ -33,8 +32,6 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.service.DataValidator; -import java.util.UUID; - import static org.thingsboard.server.dao.service.Validator.validateId; @Service @@ -89,11 +86,6 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al if (alarmComment.getType() == null) { alarmComment.setType(AlarmCommentType.OTHER); } - if (alarmComment.getId() == null) { - UUID uuid = Uuids.timeBased(); - alarmComment.setId(new AlarmCommentId(uuid)); - alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid)); - } return alarmCommentDao.createAlarmComment(tenantId, alarmComment); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java index 6182e9d9ee..fcbece7426 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java @@ -30,7 +30,7 @@ import java.util.UUID; public interface AuditLogDao extends Dao { - ListenableFuture saveByTenantId(AuditLog auditLog); + ListenableFuture saveByTenantId(AuditLog auditLog); PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java index 4ae3c265c5..94a9ac0442 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java @@ -15,13 +15,12 @@ */ package org.thingsboard.server.dao.audit; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -34,7 +33,6 @@ import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.audit.ActionStatus; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; -import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -54,7 +52,6 @@ import org.thingsboard.server.dao.service.DataValidator; import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateEntityId; @@ -66,7 +63,6 @@ import static org.thingsboard.server.dao.service.Validator.validateId; public class AuditLogServiceImpl implements AuditLogService { private static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; - private static final int INSERTS_PER_ENTRY = 3; @Autowired private AuditLogLevelFilter auditLogLevelFilter; @@ -115,7 +111,7 @@ public class AuditLogServiceImpl implements AuditLogService { } @Override - public ListenableFuture> + public ListenableFuture logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) { if (canLog(entityId.getEntityType(), actionType)) { @@ -370,9 +366,6 @@ public class AuditLogServiceImpl implements AuditLogService { ActionStatus actionStatus, String actionFailureDetails) { AuditLog result = new AuditLog(); - UUID id = Uuids.timeBased(); - result.setId(new AuditLogId(id)); - result.setCreatedTime(Uuids.unixTimestamp(id)); result.setTenantId(tenantId); result.setEntityId(entityId); result.setEntityName(entityName); @@ -386,7 +379,7 @@ public class AuditLogServiceImpl implements AuditLogService { return result; } - private ListenableFuture> logAction(TenantId tenantId, + private ListenableFuture logAction(TenantId tenantId, EntityId entityId, String entityName, CustomerId customerId, @@ -408,12 +401,12 @@ public class AuditLogServiceImpl implements AuditLogService { return Futures.immediateFailedFuture(e); } } - List> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); - futures.add(auditLogDao.saveByTenantId(auditLogEntry)); - auditLogSink.logAction(auditLogEntry); - - return Futures.allAsList(futures); + ListenableFuture future = auditLogDao.saveByTenantId(auditLogEntry); + return Futures.transform(future, auditLog -> { + auditLogSink.logAction(auditLogEntry); + return null; + }, MoreExecutors.directExecutor()); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java index 7335941982..83b9befc5d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java @@ -55,7 +55,7 @@ public class DummyAuditLogServiceImpl implements AuditLogService { } @Override - public ListenableFuture> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) { + public ListenableFuture logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) { return null; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java b/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java index 6ba72a377a..812b4c6a0f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java @@ -34,14 +34,18 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpMethod; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.audit.AuditLog; import org.thingsboard.server.common.data.id.TenantId; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Component @ConditionalOnProperty(prefix = "audit-log.sink", value = "type", havingValue = "elasticsearch") @@ -68,6 +72,7 @@ public class ElasticsearchAuditLogSink implements AuditLogSink { private String dateFormat; private RestClient restClient; + private ExecutorService executor; @PostConstruct public void init() { @@ -87,14 +92,32 @@ public class ElasticsearchAuditLogSink implements AuditLogSink { } this.restClient = builder.build(); + this.executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("elasticsearch-audit-log")); } catch (Exception e) { log.error("Sink init failed!", e); throw new RuntimeException(e.getMessage(), e); } } + @PreDestroy + private void destroy() { + if (executor != null) { + executor.shutdownNow(); + } + } + @Override public void logAction(AuditLog auditLogEntry) { + executor.execute(() -> { + try { + doLogAction(auditLogEntry); + } catch (Exception e) { + log.error("Failed to log action", e); + } + }); + } + + private void doLogAction(AuditLog auditLogEntry) { String jsonContent = createElasticJsonRecord(auditLogEntry); HttpEntity entity = new NStringEntity( diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java index 58c8bffd07..7926eb0050 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java @@ -24,6 +24,7 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; +import org.thingsboard.server.common.data.id.HasId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.DaoUtil; @@ -36,13 +37,14 @@ import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; /** * @author Valerii Sosliuk */ @Slf4j @SqlDao -public abstract class JpaAbstractDao, D> +public abstract class JpaAbstractDao, D extends HasId> extends JpaAbstractDaoListeningExecutorService implements Dao { @@ -62,9 +64,7 @@ public abstract class JpaAbstractDao, D> @Override @Transactional public D save(TenantId tenantId, D domain) { - E entity = prepare(domain); - entity = getRepository().save(entity); - return DaoUtil.getData(entity); + return save(tenantId, domain, null); } @Override @@ -75,20 +75,7 @@ public abstract class JpaAbstractDao, D> return d; } - @Override - public D create(TenantId tenantId, D domain) { - E entity = prepare(domain); - if (TransactionSynchronizationManager.isActualTransactionActive()) { - entityManager.persist(entity); - } else { - transactionTemplate.executeWithoutResult(ts -> { - entityManager.persist(entity); - }); - } - return DaoUtil.getData(entity); - } - - private E prepare(D domain) { + protected D save(TenantId tenantId, D domain, Consumer preSaveAction) { E entity; try { entity = getEntityClass().getConstructor(domain.getClass()).newInstance(domain); @@ -98,12 +85,30 @@ public abstract class JpaAbstractDao, D> } setSearchText(entity); log.debug("Saving entity {}", entity); - if (entity.getUuid() == null) { + boolean isNew = entity.getUuid() == null; + if (isNew) { UUID uuid = Uuids.timeBased(); entity.setUuid(uuid); entity.setCreatedTime(Uuids.unixTimestamp(uuid)); } - return entity; + + if (preSaveAction != null) { + preSaveAction.accept(entity); + } + if (TransactionSynchronizationManager.isActualTransactionActive()) { + return doSave(entity, isNew); + } else { + return transactionTemplate.execute(status -> doSave(entity, isNew)); + } + } + + private D doSave(E entity, boolean isNew) { + if (isNew) { + entityManager.persist(entity); + } else { + entity = entityManager.merge(entity); + } + return DaoUtil.getData(entity); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java index f44049c7a1..0e37e3ec4a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; import org.thingsboard.server.common.data.id.AlarmCommentId; @@ -53,11 +54,13 @@ public class JpaAlarmCommentDao extends JpaAbstractDao { + partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + }); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 757ecbf899..9ea49fe339 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.sql.audit; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -25,7 +24,6 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; -import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -72,22 +70,15 @@ public class JpaAuditLogDao extends JpaAbstractDao imp } @Override - public ListenableFuture saveByTenantId(AuditLog auditLog) { - return service.submit(() -> { - save(auditLog.getTenantId(), auditLog); - return null; - }); + public ListenableFuture saveByTenantId(AuditLog auditLog) { + return service.submit(() -> save(auditLog.getTenantId(), auditLog)); } @Override public AuditLog save(TenantId tenantId, AuditLog auditLog) { - if (auditLog.getId() == null) { - UUID uuid = Uuids.timeBased(); - auditLog.setId(new AuditLogId(uuid)); - auditLog.setCreatedTime(Uuids.unixTimestamp(uuid)); - } - partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - return create(tenantId, auditLog); + return save(tenantId, auditLog, entity -> { + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + }); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java index 16ca94395e..5e0a36c072 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java @@ -15,11 +15,11 @@ */ package org.thingsboard.server.dao.sql.notification; -import com.datastax.oss.driver.api.core.uuid.Uuids; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.NotificationId; import org.thingsboard.server.common.data.id.NotificationRequestId; @@ -51,17 +51,13 @@ public class JpaNotificationDao extends JpaAbstractDao { partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME, - notification.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - return create(tenantId, notification); - } - return super.save(tenantId, notification); + entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + }); } @Override From 90f971d018d9e5cec928ab2e22a05a9dfbed2321 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 29 Jan 2024 16:44:04 +0200 Subject: [PATCH 4/7] Refactor daos for partitioned entities --- .../server/dao/alarm/AlarmCommentDao.java | 6 +-- .../dao/alarm/BaseAlarmCommentService.java | 2 +- .../server/dao/audit/AuditLogDao.java | 2 - .../server/dao/audit/AuditLogServiceImpl.java | 31 ++++++----- .../server/dao/sql/JpaAbstractDao.java | 54 +++++++------------ .../dao/sql/alarm/JpaAlarmCommentDao.java | 27 ++++------ .../server/dao/sql/audit/JpaAuditLogDao.java | 24 ++++----- .../dao/sql/edge/JpaBaseEdgeEventDao.java | 17 ++++-- .../sql/notification/JpaNotificationDao.java | 21 ++++---- .../dao/sql/ota/JpaOtaPackageInfoDao.java | 2 + .../dao/sql/alarm/JpaAlarmCommentDaoTest.java | 2 +- 11 files changed, 86 insertions(+), 102 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentDao.java index 947db57532..442bff7516 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentDao.java @@ -18,7 +18,6 @@ package org.thingsboard.server.dao.alarm; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; -import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; @@ -29,13 +28,10 @@ import java.util.UUID; public interface AlarmCommentDao extends Dao { - AlarmComment createAlarmComment(TenantId tenantId, AlarmComment alarmComment); - - void deleteAlarmComment(TenantId tenantId, AlarmCommentId alarmCommentId); - AlarmComment findAlarmCommentById(TenantId tenantId, UUID key); PageData findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink); ListenableFuture findAlarmCommentByIdAsync(TenantId tenantId, UUID key); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 6ded31fde3..56dd7a1d9b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -86,7 +86,7 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al if (alarmComment.getType() == null) { alarmComment.setType(AlarmCommentType.OTHER); } - return alarmCommentDao.createAlarmComment(tenantId, alarmComment); + return alarmCommentDao.save(tenantId, alarmComment); } private AlarmComment updateAlarmComment(TenantId tenantId, AlarmComment newAlarmComment) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java index fcbece7426..a3bdea1a37 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java @@ -30,8 +30,6 @@ import java.util.UUID; public interface AuditLogDao extends Dao { - ListenableFuture saveByTenantId(AuditLog auditLog); - PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink); PageData findAuditLogsByTenantIdAndCustomerId(UUID tenantId, CustomerId customerId, List actionTypes, TimePageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java index 94a9ac0442..98b40e52f8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -48,6 +47,7 @@ import org.thingsboard.server.dao.audit.sink.AuditLogSink; import org.thingsboard.server.dao.device.provision.ProvisionRequest; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.service.DataValidator; +import org.thingsboard.server.dao.sql.JpaExecutorService; import java.io.PrintWriter; import java.io.StringWriter; @@ -76,6 +76,9 @@ public class AuditLogServiceImpl implements AuditLogService { @Autowired private AuditLogSink auditLogSink; + @Autowired + private JpaExecutorService executor; + @Autowired private DataValidator auditLogValidator; @@ -380,15 +383,15 @@ public class AuditLogServiceImpl implements AuditLogService { } private ListenableFuture logAction(TenantId tenantId, - EntityId entityId, - String entityName, - CustomerId customerId, - UserId userId, - String userName, - ActionType actionType, - JsonNode actionData, - ActionStatus actionStatus, - String actionFailureDetails) { + EntityId entityId, + String entityName, + CustomerId customerId, + UserId userId, + String userName, + ActionType actionType, + JsonNode actionData, + ActionStatus actionStatus, + String actionFailureDetails) { AuditLog auditLogEntry = createAuditLogEntry(tenantId, entityId, entityName, customerId, userId, userName, actionType, actionData, actionStatus, actionFailureDetails); log.trace("Executing logAction [{}]", auditLogEntry); @@ -402,11 +405,11 @@ public class AuditLogServiceImpl implements AuditLogService { } } - ListenableFuture future = auditLogDao.saveByTenantId(auditLogEntry); - return Futures.transform(future, auditLog -> { - auditLogSink.logAction(auditLogEntry); + return executor.submit(() -> { + AuditLog auditLog = auditLogDao.save(tenantId, auditLogEntry); + auditLogSink.logAction(auditLog); return null; - }, MoreExecutors.directExecutor()); + }); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java index 7926eb0050..92bc4187c1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java @@ -19,12 +19,8 @@ import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronizationManager; -import org.springframework.transaction.support.TransactionTemplate; -import org.thingsboard.server.common.data.id.HasId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.DaoUtil; @@ -37,45 +33,26 @@ import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.function.Consumer; /** * @author Valerii Sosliuk */ @Slf4j @SqlDao -public abstract class JpaAbstractDao, D extends HasId> +public abstract class JpaAbstractDao, D> extends JpaAbstractDaoListeningExecutorService implements Dao { @PersistenceContext private EntityManager entityManager; - @Autowired - private TransactionTemplate transactionTemplate; - protected abstract Class getEntityClass(); protected abstract JpaRepository getRepository(); - protected void setSearchText(E entity) { - } - @Override @Transactional public D save(TenantId tenantId, D domain) { - return save(tenantId, domain, null); - } - - @Override - @Transactional - public D saveAndFlush(TenantId tenantId, D domain) { - D d = save(tenantId, domain); - getRepository().flush(); - return d; - } - - protected D save(TenantId tenantId, D domain, Consumer preSaveAction) { E entity; try { entity = getEntityClass().getConstructor(domain.getClass()).newInstance(domain); @@ -83,7 +60,6 @@ public abstract class JpaAbstractDao, D extends HasId log.error("Can't create entity for domain object {}", domain, e); throw new IllegalArgumentException("Can't create entity for domain object {" + domain + "}", e); } - setSearchText(entity); log.debug("Saving entity {}", entity); boolean isNew = entity.getUuid() == null; if (isNew) { @@ -92,17 +68,9 @@ public abstract class JpaAbstractDao, D extends HasId entity.setCreatedTime(Uuids.unixTimestamp(uuid)); } - if (preSaveAction != null) { - preSaveAction.accept(entity); + if (isPartitioned()) { + createPartition(entity); } - if (TransactionSynchronizationManager.isActualTransactionActive()) { - return doSave(entity, isNew); - } else { - return transactionTemplate.execute(status -> doSave(entity, isNew)); - } - } - - private D doSave(E entity, boolean isNew) { if (isNew) { entityManager.persist(entity); } else { @@ -111,6 +79,14 @@ public abstract class JpaAbstractDao, D extends HasId return DaoUtil.getData(entity); } + @Override + @Transactional + public D saveAndFlush(TenantId tenantId, D domain) { + D d = save(tenantId, domain); + getRepository().flush(); + return d; + } + @Override public D findById(TenantId tenantId, UUID key) { log.debug("Get entity by key {}", key); @@ -155,4 +131,12 @@ public abstract class JpaAbstractDao, D extends HasId List entities = Lists.newArrayList(getRepository().findAll()); return DaoUtil.convertDataList(entities); } + + public boolean isPartitioned() { + return false; + } + + public void createPartition(E entity) { + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java index 0e37e3ec4a..47df3bd0c6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java @@ -22,10 +22,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; -import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; @@ -54,21 +52,6 @@ public class JpaAlarmCommentDao extends JpaAbstractDao { - partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - }); - } - - @Override - public void deleteAlarmComment(TenantId tenantId, AlarmCommentId alarmCommentId){ - log.trace("Try to delete entity alarm comment by id using [{}]", alarmCommentId); - alarmCommentRepository.deleteById(alarmCommentId.getId()); - } - @Override public PageData findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink){ log.trace("Try to find alarm comments by alarm id using [{}]", id); @@ -88,6 +71,16 @@ public class JpaAlarmCommentDao extends JpaAbstractDao getEntityClass() { return AlarmCommentEntity.class; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 9ea49fe339..ccada3a21b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.sql.audit; -import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -26,7 +25,6 @@ import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; @@ -69,18 +67,6 @@ public class JpaAuditLogDao extends JpaAbstractDao imp return auditLogRepository; } - @Override - public ListenableFuture saveByTenantId(AuditLog auditLog) { - return service.submit(() -> save(auditLog.getTenantId(), auditLog)); - } - - @Override - public AuditLog save(TenantId tenantId, AuditLog auditLog) { - return save(tenantId, auditLog, entity -> { - partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - }); - } - @Override public PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink) { return DaoUtil.toPageData( @@ -172,4 +158,14 @@ public class JpaAuditLogDao extends JpaAbstractDao imp jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs); } + @Override + public boolean isPartitioned() { + return true; + } + + @Override + public void createPartition(AuditLogEntity entity) { + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index 91aa61baf4..1f8d0c8026 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -47,7 +47,6 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -151,8 +150,9 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao save(EdgeEventEntity entity) { @@ -227,4 +227,15 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao { - partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME, - entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - }); - } - @Override public PageData findUnreadByRecipientIdAndPageLink(TenantId tenantId, UserId recipientId, PageLink pageLink) { return DaoUtil.toPageData(notificationRepository.findByRecipientIdAndStatusNot(recipientId.getId(), NotificationStatus.READ, @@ -110,6 +100,17 @@ public class JpaNotificationDao extends JpaAbstractDao getEntityClass() { return NotificationEntity.class; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/ota/JpaOtaPackageInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/ota/JpaOtaPackageInfoDao.java index 8a65bee2e1..099aebb816 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/ota/JpaOtaPackageInfoDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/ota/JpaOtaPackageInfoDao.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.OtaPackageId; @@ -58,6 +59,7 @@ public class JpaOtaPackageInfoDao extends JpaAbstractDao Date: Tue, 30 Jan 2024 14:42:07 +0200 Subject: [PATCH 5/7] Generic database error message; use TB StringUtils in BaseController --- .../server/controller/BaseController.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 414d13a714..b8a787f56c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -18,10 +18,7 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.ListenableFuture; import lombok.Getter; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.postgresql.util.PSQLException; -import org.postgresql.util.ServerErrorMessage; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -48,6 +45,7 @@ import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.OtaPackage; import org.thingsboard.server.common.data.OtaPackageInfo; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; @@ -171,7 +169,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.thingsboard.server.common.data.StringUtils.isNotEmpty; import static org.thingsboard.server.common.data.query.EntityKeyType.ENTITY_FIELD; import static org.thingsboard.server.controller.UserController.YOU_DON_T_HAVE_PERMISSION_TO_PERFORM_THIS_OPERATION; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -378,15 +376,11 @@ public abstract class BaseController { } else if (exception instanceof AsyncRequestTimeoutException) { return new ThingsboardException("Request timeout", ThingsboardErrorCode.GENERAL); } else if (exception instanceof DataAccessException) { - Throwable rootCause = ExceptionUtils.getRootCause(exception); - if (rootCause instanceof PSQLException) { - String sqlError = Optional.ofNullable(((PSQLException) rootCause).getServerErrorMessage()) - .map(ServerErrorMessage::getMessage).orElse(null); - if (isNotEmpty(sqlError)) { - sqlError = StringUtils.capitalize(sqlError); - return new ThingsboardException(sqlError, ThingsboardErrorCode.GENERAL); - } + String errorType = exception.getClass().getSimpleName(); + if (!logControllerErrorStackTrace) { // not to log the error twice + log.warn("Database error: {} - {}", errorType, ExceptionUtils.getRootCauseMessage(exception)); } + return new ThingsboardException("Database error: " + errorType, ThingsboardErrorCode.GENERAL); } return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL); } From 5875a401184874888423ba22f61ce8d840467223 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 30 Jan 2024 15:16:53 +0200 Subject: [PATCH 6/7] Error handling for PartitionChangeEvent listeners --- .../DefaultSubscriptionManagerService.java | 8 ++++---- .../queue/discovery/HashPartitionService.java | 14 ++++++++++++-- .../discovery/TbApplicationEventListener.java | 6 +++++- .../vc/DefaultClusterVersionControlService.java | 2 +- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index b04291a303..c073aa6a95 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; @@ -37,15 +36,16 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.state.DefaultDeviceStateService; @@ -154,7 +154,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { entitySubscriptions.values().removeIf(sub -> - !partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()).isMyPartition()); + !partitionService.isMyPartition(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId())); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index bfc50e076b..de3c394c22 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -251,7 +251,12 @@ public class HashPartitionService implements PartitionService { @Override public boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId) { - return resolve(serviceType, tenantId, entityId).isMyPartition(); + try { + return resolve(serviceType, tenantId, entityId).isMyPartition(); + } catch (TenantNotFoundException e) { + log.warn("Tenant with id {} not found", tenantId, new RuntimeException("stacktrace")); + return false; + } } private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { @@ -376,7 +381,12 @@ public class HashPartitionService implements PartitionService { .collect(Collectors.toList())) .collect(Collectors.joining(System.lineSeparator()))); } - applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); + PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap); + try { + applicationEventPublisher.publishEvent(event); + } catch (Exception e) { + log.error("Failed to publish partition change event {}", event, e); + } } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java index 8e805aac3d..341ab0f1eb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java @@ -41,7 +41,11 @@ public abstract class TbApplicationEventListener i seqNumberLock.unlock(); } if (validUpdate && filterTbApplicationEvent(event)) { - onTbApplicationEvent(event); + try { + onTbApplicationEvent(event); + } catch (Exception e) { + log.error("Failed to handle partition change event: {}", event, e); + } } else { log.info("Application event ignored due to invalid sequence number ({} > {}). Event: {}", lastProcessedSequenceNumber, event.getSequenceNumber(), event); } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index fa45ddf98c..1d78b5ddd1 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -160,7 +160,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { for (TenantId tenantId : vcService.getActiveRepositoryTenants()) { - if (!partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId).isMyPartition()) { + if (!partitionService.isMyPartition(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId)) { var lock = getRepoLock(tenantId); lock.lock(); try { From 2d7c1a3cd521fca2187fe3e503c5a5f6c3f7194a Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 31 Jan 2024 12:15:06 +0200 Subject: [PATCH 7/7] Implement JpaPartitionedAbstractDao --- .../server/controller/BaseController.java | 2 +- .../server/dao/sql/JpaAbstractDao.java | 24 +++-------- .../dao/sql/JpaPartitionedAbstractDao.java | 43 +++++++++++++++++++ .../dao/sql/alarm/JpaAlarmCommentDao.java | 11 ++--- .../server/dao/sql/audit/JpaAuditLogDao.java | 9 +--- .../dao/sql/edge/JpaBaseEdgeEventDao.java | 9 +--- .../sql/notification/JpaNotificationDao.java | 9 +--- 7 files changed, 58 insertions(+), 49 deletions(-) create mode 100644 dao/src/main/java/org/thingsboard/server/dao/sql/JpaPartitionedAbstractDao.java diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index b8a787f56c..6851029d1e 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -380,7 +380,7 @@ public abstract class BaseController { if (!logControllerErrorStackTrace) { // not to log the error twice log.warn("Database error: {} - {}", errorType, ExceptionUtils.getRootCauseMessage(exception)); } - return new ThingsboardException("Database error: " + errorType, ThingsboardErrorCode.GENERAL); + return new ThingsboardException("Database error", ThingsboardErrorCode.GENERAL); } return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java index 92bc4187c1..fe0567b8d8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java @@ -43,9 +43,6 @@ public abstract class JpaAbstractDao, D> extends JpaAbstractDaoListeningExecutorService implements Dao { - @PersistenceContext - private EntityManager entityManager; - protected abstract Class getEntityClass(); protected abstract JpaRepository getRepository(); @@ -67,18 +64,14 @@ public abstract class JpaAbstractDao, D> entity.setUuid(uuid); entity.setCreatedTime(Uuids.unixTimestamp(uuid)); } - - if (isPartitioned()) { - createPartition(entity); - } - if (isNew) { - entityManager.persist(entity); - } else { - entity = entityManager.merge(entity); - } + entity = doSave(entity, isNew); return DaoUtil.getData(entity); } + protected E doSave(E entity, boolean isNew) { + return getRepository().save(entity); + } + @Override @Transactional public D saveAndFlush(TenantId tenantId, D domain) { @@ -132,11 +125,4 @@ public abstract class JpaAbstractDao, D> return DaoUtil.convertDataList(entities); } - public boolean isPartitioned() { - return false; - } - - public void createPartition(E entity) { - } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaPartitionedAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaPartitionedAbstractDao.java new file mode 100644 index 0000000000..76db367ffb --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaPartitionedAbstractDao.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql; + +import org.thingsboard.server.dao.model.BaseEntity; +import org.thingsboard.server.dao.util.SqlDao; + +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; + +@SqlDao +public abstract class JpaPartitionedAbstractDao, D> extends JpaAbstractDao { + + @PersistenceContext + private EntityManager entityManager; + + @Override + protected E doSave(E entity, boolean isNew) { + createPartition(entity); + if (isNew) { + entityManager.persist(entity); + } else { + entity = entityManager.merge(entity); + } + return entity; + } + + public abstract void createPartition(E entity); + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java index 47df3bd0c6..1b1dc825ce 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java @@ -31,7 +31,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmCommentDao; import org.thingsboard.server.dao.model.sql.AlarmCommentEntity; -import org.thingsboard.server.dao.sql.JpaAbstractDao; +import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao; import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; import org.thingsboard.server.dao.util.SqlDao; @@ -44,7 +44,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.ALARM_COMMENT_TABL @Component @SqlDao @RequiredArgsConstructor -public class JpaAlarmCommentDao extends JpaAbstractDao implements AlarmCommentDao { +public class JpaAlarmCommentDao extends JpaPartitionedAbstractDao implements AlarmCommentDao { private final SqlPartitioningRepository partitioningRepository; @Value("${sql.alarm_comments.partition_size:168}") private int partitionSizeInHours; @@ -53,7 +53,7 @@ public class JpaAlarmCommentDao extends JpaAbstractDao findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink){ + public PageData findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink) { log.trace("Try to find alarm comments by alarm id using [{}]", id); return DaoUtil.toPageData( alarmCommentRepository.findAllByAlarmId(id.getId(), DaoUtil.toPageable(pageLink))); @@ -71,11 +71,6 @@ public class JpaAlarmCommentDao extends JpaAbstractDao implements AuditLogDao { +public class JpaAuditLogDao extends JpaPartitionedAbstractDao implements AuditLogDao { private final AuditLogRepository auditLogRepository; private final SqlPartitioningRepository partitioningRepository; @@ -158,11 +158,6 @@ public class JpaAuditLogDao extends JpaAbstractDao imp jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs); } - @Override - public boolean isPartitioned() { - return true; - } - @Override public void createPartition(AuditLogEntity entity) { partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index 1f8d0c8026..d37e28bd0f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -35,7 +35,7 @@ import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.edge.EdgeEventDao; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.EdgeEventEntity; -import org.thingsboard.server.dao.sql.JpaAbstractDao; +import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao; import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; @@ -57,7 +57,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; @SqlDao @RequiredArgsConstructor @Slf4j -public class JpaBaseEdgeEventDao extends JpaAbstractDao implements EdgeEventDao { +public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao implements EdgeEventDao { private final UUID systemTenantId = NULL_UUID; @@ -228,11 +228,6 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao implements NotificationDao { +public class JpaNotificationDao extends JpaPartitionedAbstractDao implements NotificationDao { private final NotificationRepository notificationRepository; private final SqlPartitioningRepository partitioningRepository; @@ -100,11 +100,6 @@ public class JpaNotificationDao extends JpaAbstractDao