diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java index 0545e490da..4004477004 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/audit/AuditLogService.java @@ -38,7 +38,7 @@ public interface AuditLogService { PageData findAuditLogsByTenantId(TenantId tenantId, List actionTypes, TimePageLink pageLink); - ListenableFuture> logEntityAction( + ListenableFuture logEntityAction( TenantId tenantId, CustomerId customerId, UserId userId, diff --git a/dao/src/main/java/org/thingsboard/server/dao/Dao.java b/dao/src/main/java/org/thingsboard/server/dao/Dao.java index e26fdb79ac..82b5837bcb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/Dao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/Dao.java @@ -39,8 +39,6 @@ public interface Dao { T saveAndFlush(TenantId tenantId, T t); - T create(TenantId tenantId, T t); - boolean removeById(TenantId tenantId, UUID id); void removeAllByIds(Collection ids); diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 218d31dd53..6ded31fde3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.alarm; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.ListenableFuture; @@ -33,8 +32,6 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.service.DataValidator; -import java.util.UUID; - import static org.thingsboard.server.dao.service.Validator.validateId; @Service @@ -89,11 +86,6 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al if (alarmComment.getType() == null) { alarmComment.setType(AlarmCommentType.OTHER); } - if (alarmComment.getId() == null) { - UUID uuid = Uuids.timeBased(); - alarmComment.setId(new AlarmCommentId(uuid)); - alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid)); - } return alarmCommentDao.createAlarmComment(tenantId, alarmComment); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java index 6182e9d9ee..fcbece7426 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java @@ -30,7 +30,7 @@ import java.util.UUID; public interface AuditLogDao extends Dao { - ListenableFuture saveByTenantId(AuditLog auditLog); + ListenableFuture saveByTenantId(AuditLog auditLog); PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java index 4ae3c265c5..94a9ac0442 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java @@ -15,13 +15,12 @@ */ package org.thingsboard.server.dao.audit; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -34,7 +33,6 @@ import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.audit.ActionStatus; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; -import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -54,7 +52,6 @@ import org.thingsboard.server.dao.service.DataValidator; import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateEntityId; @@ -66,7 +63,6 @@ import static org.thingsboard.server.dao.service.Validator.validateId; public class AuditLogServiceImpl implements AuditLogService { private static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; - private static final int INSERTS_PER_ENTRY = 3; @Autowired private AuditLogLevelFilter auditLogLevelFilter; @@ -115,7 +111,7 @@ public class AuditLogServiceImpl implements AuditLogService { } @Override - public ListenableFuture> + public ListenableFuture logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) { if (canLog(entityId.getEntityType(), actionType)) { @@ -370,9 +366,6 @@ public class AuditLogServiceImpl implements AuditLogService { ActionStatus actionStatus, String actionFailureDetails) { AuditLog result = new AuditLog(); - UUID id = Uuids.timeBased(); - result.setId(new AuditLogId(id)); - result.setCreatedTime(Uuids.unixTimestamp(id)); result.setTenantId(tenantId); result.setEntityId(entityId); result.setEntityName(entityName); @@ -386,7 +379,7 @@ public class AuditLogServiceImpl implements AuditLogService { return result; } - private ListenableFuture> logAction(TenantId tenantId, + private ListenableFuture logAction(TenantId tenantId, EntityId entityId, String entityName, CustomerId customerId, @@ -408,12 +401,12 @@ public class AuditLogServiceImpl implements AuditLogService { return Futures.immediateFailedFuture(e); } } - List> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); - futures.add(auditLogDao.saveByTenantId(auditLogEntry)); - auditLogSink.logAction(auditLogEntry); - - return Futures.allAsList(futures); + ListenableFuture future = auditLogDao.saveByTenantId(auditLogEntry); + return Futures.transform(future, auditLog -> { + auditLogSink.logAction(auditLogEntry); + return null; + }, MoreExecutors.directExecutor()); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java index 7335941982..83b9befc5d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/DummyAuditLogServiceImpl.java @@ -55,7 +55,7 @@ public class DummyAuditLogServiceImpl implements AuditLogService { } @Override - public ListenableFuture> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) { + public ListenableFuture logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) { return null; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java b/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java index 6ba72a377a..812b4c6a0f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/sink/ElasticsearchAuditLogSink.java @@ -34,14 +34,18 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.http.HttpMethod; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.audit.AuditLog; import org.thingsboard.server.common.data.id.TenantId; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Component @ConditionalOnProperty(prefix = "audit-log.sink", value = "type", havingValue = "elasticsearch") @@ -68,6 +72,7 @@ public class ElasticsearchAuditLogSink implements AuditLogSink { private String dateFormat; private RestClient restClient; + private ExecutorService executor; @PostConstruct public void init() { @@ -87,14 +92,32 @@ public class ElasticsearchAuditLogSink implements AuditLogSink { } this.restClient = builder.build(); + this.executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("elasticsearch-audit-log")); } catch (Exception e) { log.error("Sink init failed!", e); throw new RuntimeException(e.getMessage(), e); } } + @PreDestroy + private void destroy() { + if (executor != null) { + executor.shutdownNow(); + } + } + @Override public void logAction(AuditLog auditLogEntry) { + executor.execute(() -> { + try { + doLogAction(auditLogEntry); + } catch (Exception e) { + log.error("Failed to log action", e); + } + }); + } + + private void doLogAction(AuditLog auditLogEntry) { String jsonContent = createElasticJsonRecord(auditLogEntry); HttpEntity entity = new NStringEntity( diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java index 58c8bffd07..7926eb0050 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDao.java @@ -24,6 +24,7 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; +import org.thingsboard.server.common.data.id.HasId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.DaoUtil; @@ -36,13 +37,14 @@ import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.function.Consumer; /** * @author Valerii Sosliuk */ @Slf4j @SqlDao -public abstract class JpaAbstractDao, D> +public abstract class JpaAbstractDao, D extends HasId> extends JpaAbstractDaoListeningExecutorService implements Dao { @@ -62,9 +64,7 @@ public abstract class JpaAbstractDao, D> @Override @Transactional public D save(TenantId tenantId, D domain) { - E entity = prepare(domain); - entity = getRepository().save(entity); - return DaoUtil.getData(entity); + return save(tenantId, domain, null); } @Override @@ -75,20 +75,7 @@ public abstract class JpaAbstractDao, D> return d; } - @Override - public D create(TenantId tenantId, D domain) { - E entity = prepare(domain); - if (TransactionSynchronizationManager.isActualTransactionActive()) { - entityManager.persist(entity); - } else { - transactionTemplate.executeWithoutResult(ts -> { - entityManager.persist(entity); - }); - } - return DaoUtil.getData(entity); - } - - private E prepare(D domain) { + protected D save(TenantId tenantId, D domain, Consumer preSaveAction) { E entity; try { entity = getEntityClass().getConstructor(domain.getClass()).newInstance(domain); @@ -98,12 +85,30 @@ public abstract class JpaAbstractDao, D> } setSearchText(entity); log.debug("Saving entity {}", entity); - if (entity.getUuid() == null) { + boolean isNew = entity.getUuid() == null; + if (isNew) { UUID uuid = Uuids.timeBased(); entity.setUuid(uuid); entity.setCreatedTime(Uuids.unixTimestamp(uuid)); } - return entity; + + if (preSaveAction != null) { + preSaveAction.accept(entity); + } + if (TransactionSynchronizationManager.isActualTransactionActive()) { + return doSave(entity, isNew); + } else { + return transactionTemplate.execute(status -> doSave(entity, isNew)); + } + } + + private D doSave(E entity, boolean isNew) { + if (isNew) { + entityManager.persist(entity); + } else { + entity = entityManager.merge(entity); + } + return DaoUtil.getData(entity); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java index f44049c7a1..0e37e3ec4a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmCommentDao.java @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; import org.thingsboard.server.common.data.id.AlarmCommentId; @@ -53,11 +54,13 @@ public class JpaAlarmCommentDao extends JpaAbstractDao { + partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + }); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 757ecbf899..9ea49fe339 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.sql.audit; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -25,7 +24,6 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; -import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -72,22 +70,15 @@ public class JpaAuditLogDao extends JpaAbstractDao imp } @Override - public ListenableFuture saveByTenantId(AuditLog auditLog) { - return service.submit(() -> { - save(auditLog.getTenantId(), auditLog); - return null; - }); + public ListenableFuture saveByTenantId(AuditLog auditLog) { + return service.submit(() -> save(auditLog.getTenantId(), auditLog)); } @Override public AuditLog save(TenantId tenantId, AuditLog auditLog) { - if (auditLog.getId() == null) { - UUID uuid = Uuids.timeBased(); - auditLog.setId(new AuditLogId(uuid)); - auditLog.setCreatedTime(Uuids.unixTimestamp(uuid)); - } - partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - return create(tenantId, auditLog); + return save(tenantId, auditLog, entity -> { + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + }); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java index 16ca94395e..5e0a36c072 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/notification/JpaNotificationDao.java @@ -15,11 +15,11 @@ */ package org.thingsboard.server.dao.sql.notification; -import com.datastax.oss.driver.api.core.uuid.Uuids; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.NotificationId; import org.thingsboard.server.common.data.id.NotificationRequestId; @@ -51,17 +51,13 @@ public class JpaNotificationDao extends JpaAbstractDao { partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME, - notification.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); - return create(tenantId, notification); - } - return super.save(tenantId, notification); + entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + }); } @Override