Refactoring for dao save

This commit is contained in:
ViacheslavKlimov 2024-01-29 13:28:24 +02:00
parent 18c354c600
commit feb55f636a
11 changed files with 74 additions and 73 deletions

View File

@ -38,7 +38,7 @@ public interface AuditLogService {
PageData<AuditLog> findAuditLogsByTenantId(TenantId tenantId, List<ActionType> actionTypes, TimePageLink pageLink);
<E extends HasName, I extends EntityId> ListenableFuture<List<Void>> logEntityAction(
<E extends HasName, I extends EntityId> ListenableFuture<Void> logEntityAction(
TenantId tenantId,
CustomerId customerId,
UserId userId,

View File

@ -39,8 +39,6 @@ public interface Dao<T> {
T saveAndFlush(TenantId tenantId, T t);
T create(TenantId tenantId, T t);
boolean removeById(TenantId tenantId, UUID id);
void removeAllByIds(Collection<UUID> ids);

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.dao.alarm;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.ListenableFuture;
@ -33,8 +32,6 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.service.DataValidator;
import java.util.UUID;
import static org.thingsboard.server.dao.service.Validator.validateId;
@Service
@ -89,11 +86,6 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al
if (alarmComment.getType() == null) {
alarmComment.setType(AlarmCommentType.OTHER);
}
if (alarmComment.getId() == null) {
UUID uuid = Uuids.timeBased();
alarmComment.setId(new AlarmCommentId(uuid));
alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid));
}
return alarmCommentDao.createAlarmComment(tenantId, alarmComment);
}

View File

@ -30,7 +30,7 @@ import java.util.UUID;
public interface AuditLogDao extends Dao<AuditLog> {
ListenableFuture<Void> saveByTenantId(AuditLog auditLog);
ListenableFuture<AuditLog> saveByTenantId(AuditLog auditLog);
PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink);

View File

@ -15,13 +15,12 @@
*/
package org.thingsboard.server.dao.audit;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -34,7 +33,6 @@ import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.audit.ActionStatus;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.AuditLogId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -54,7 +52,6 @@ import org.thingsboard.server.dao.service.DataValidator;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.service.Validator.validateEntityId;
@ -66,7 +63,6 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
public class AuditLogServiceImpl implements AuditLogService {
private static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
private static final int INSERTS_PER_ENTRY = 3;
@Autowired
private AuditLogLevelFilter auditLogLevelFilter;
@ -115,7 +111,7 @@ public class AuditLogServiceImpl implements AuditLogService {
}
@Override
public <E extends HasName, I extends EntityId> ListenableFuture<List<Void>>
public <E extends HasName, I extends EntityId> ListenableFuture<Void>
logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity,
ActionType actionType, Exception e, Object... additionalInfo) {
if (canLog(entityId.getEntityType(), actionType)) {
@ -370,9 +366,6 @@ public class AuditLogServiceImpl implements AuditLogService {
ActionStatus actionStatus,
String actionFailureDetails) {
AuditLog result = new AuditLog();
UUID id = Uuids.timeBased();
result.setId(new AuditLogId(id));
result.setCreatedTime(Uuids.unixTimestamp(id));
result.setTenantId(tenantId);
result.setEntityId(entityId);
result.setEntityName(entityName);
@ -386,7 +379,7 @@ public class AuditLogServiceImpl implements AuditLogService {
return result;
}
private ListenableFuture<List<Void>> logAction(TenantId tenantId,
private ListenableFuture<Void> logAction(TenantId tenantId,
EntityId entityId,
String entityName,
CustomerId customerId,
@ -408,12 +401,12 @@ public class AuditLogServiceImpl implements AuditLogService {
return Futures.immediateFailedFuture(e);
}
}
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
futures.add(auditLogDao.saveByTenantId(auditLogEntry));
auditLogSink.logAction(auditLogEntry);
return Futures.allAsList(futures);
ListenableFuture<AuditLog> future = auditLogDao.saveByTenantId(auditLogEntry);
return Futures.transform(future, auditLog -> {
auditLogSink.logAction(auditLogEntry);
return null;
}, MoreExecutors.directExecutor());
}
}

View File

@ -55,7 +55,7 @@ public class DummyAuditLogServiceImpl implements AuditLogService {
}
@Override
public <E extends HasName, I extends EntityId> ListenableFuture<List<Void>> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) {
public <E extends HasName, I extends EntityId> ListenableFuture<Void> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) {
return null;
}

View File

@ -34,14 +34,18 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.TenantId;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
@ConditionalOnProperty(prefix = "audit-log.sink", value = "type", havingValue = "elasticsearch")
@ -68,6 +72,7 @@ public class ElasticsearchAuditLogSink implements AuditLogSink {
private String dateFormat;
private RestClient restClient;
private ExecutorService executor;
@PostConstruct
public void init() {
@ -87,14 +92,32 @@ public class ElasticsearchAuditLogSink implements AuditLogSink {
}
this.restClient = builder.build();
this.executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("elasticsearch-audit-log"));
} catch (Exception e) {
log.error("Sink init failed!", e);
throw new RuntimeException(e.getMessage(), e);
}
}
@PreDestroy
private void destroy() {
if (executor != null) {
executor.shutdownNow();
}
}
@Override
public void logAction(AuditLog auditLogEntry) {
executor.execute(() -> {
try {
doLogAction(auditLogEntry);
} catch (Exception e) {
log.error("Failed to log action", e);
}
});
}
private void doLogAction(AuditLog auditLogEntry) {
String jsonContent = createElasticJsonRecord(auditLogEntry);
HttpEntity entity = new NStringEntity(

View File

@ -24,6 +24,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.thingsboard.server.common.data.id.HasId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.Dao;
import org.thingsboard.server.dao.DaoUtil;
@ -36,13 +37,14 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
/**
* @author Valerii Sosliuk
*/
@Slf4j
@SqlDao
public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
public abstract class JpaAbstractDao<E extends BaseEntity<D>, D extends HasId<?>>
extends JpaAbstractDaoListeningExecutorService
implements Dao<D> {
@ -62,9 +64,7 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
@Override
@Transactional
public D save(TenantId tenantId, D domain) {
E entity = prepare(domain);
entity = getRepository().save(entity);
return DaoUtil.getData(entity);
return save(tenantId, domain, null);
}
@Override
@ -75,20 +75,7 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
return d;
}
@Override
public D create(TenantId tenantId, D domain) {
E entity = prepare(domain);
if (TransactionSynchronizationManager.isActualTransactionActive()) {
entityManager.persist(entity);
} else {
transactionTemplate.executeWithoutResult(ts -> {
entityManager.persist(entity);
});
}
return DaoUtil.getData(entity);
}
private E prepare(D domain) {
protected D save(TenantId tenantId, D domain, Consumer<E> preSaveAction) {
E entity;
try {
entity = getEntityClass().getConstructor(domain.getClass()).newInstance(domain);
@ -98,12 +85,30 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
}
setSearchText(entity);
log.debug("Saving entity {}", entity);
if (entity.getUuid() == null) {
boolean isNew = entity.getUuid() == null;
if (isNew) {
UUID uuid = Uuids.timeBased();
entity.setUuid(uuid);
entity.setCreatedTime(Uuids.unixTimestamp(uuid));
}
return entity;
if (preSaveAction != null) {
preSaveAction.accept(entity);
}
if (TransactionSynchronizationManager.isActualTransactionActive()) {
return doSave(entity, isNew);
} else {
return transactionTemplate.execute(status -> doSave(entity, isNew));
}
}
private D doSave(E entity, boolean isNew) {
if (isNew) {
entityManager.persist(entity);
} else {
entity = entityManager.merge(entity);
}
return DaoUtil.getData(entity);
}
@Override

View File

@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
import org.thingsboard.server.common.data.id.AlarmCommentId;
@ -53,11 +54,13 @@ public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, Alarm
@Autowired
private AlarmCommentRepository alarmCommentRepository;
@Transactional
@Override
public AlarmComment createAlarmComment(TenantId tenantId, AlarmComment alarmComment){
log.trace("Saving entity {}", alarmComment);
partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, alarmComment.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
return create(tenantId, alarmComment);
return save(tenantId, alarmComment, entity -> {
partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
});
}
@Override

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.dao.sql.audit;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -25,7 +24,6 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.id.AuditLogId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -72,22 +70,15 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
}
@Override
public ListenableFuture<Void> saveByTenantId(AuditLog auditLog) {
return service.submit(() -> {
save(auditLog.getTenantId(), auditLog);
return null;
});
public ListenableFuture<AuditLog> saveByTenantId(AuditLog auditLog) {
return service.submit(() -> save(auditLog.getTenantId(), auditLog));
}
@Override
public AuditLog save(TenantId tenantId, AuditLog auditLog) {
if (auditLog.getId() == null) {
UUID uuid = Uuids.timeBased();
auditLog.setId(new AuditLogId(uuid));
auditLog.setCreatedTime(Uuids.unixTimestamp(uuid));
}
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
return create(tenantId, auditLog);
return save(tenantId, auditLog, entity -> {
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
});
}
@Override

View File

@ -15,11 +15,11 @@
*/
package org.thingsboard.server.dao.sql.notification;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.NotificationId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
@ -51,17 +51,13 @@ public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notif
@Value("${sql.notifications.partition_size:168}")
private int partitionSizeInHours;
@Transactional
@Override
public Notification save(TenantId tenantId, Notification notification) {
if (notification.getId() == null) {
UUID uuid = Uuids.timeBased();
notification.setId(new NotificationId(uuid));
notification.setCreatedTime(Uuids.unixTimestamp(uuid));
return save(tenantId, notification, entity -> {
partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME,
notification.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
return create(tenantId, notification);
}
return super.save(tenantId, notification);
entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
});
}
@Override