Merge pull request #10083 from thingsboard/save-improvements
Performance improvements for entities saving
This commit is contained in:
		
						commit
						df39f8b8d6
					
				@ -16,14 +16,13 @@
 | 
			
		||||
package org.thingsboard.server.controller;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.apache.commons.lang3.exception.ExceptionUtils;
 | 
			
		||||
import org.slf4j.Logger;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.dao.DataAccessException;
 | 
			
		||||
import org.springframework.http.MediaType;
 | 
			
		||||
import org.springframework.security.core.Authentication;
 | 
			
		||||
import org.springframework.security.core.context.SecurityContextHolder;
 | 
			
		||||
@ -376,9 +375,14 @@ public abstract class BaseController {
 | 
			
		||||
            return new ThingsboardException("Unable to send mail: " + exception.getMessage(), ThingsboardErrorCode.GENERAL);
 | 
			
		||||
        } else if (exception instanceof AsyncRequestTimeoutException) {
 | 
			
		||||
            return new ThingsboardException("Request timeout", ThingsboardErrorCode.GENERAL);
 | 
			
		||||
        } else {
 | 
			
		||||
            return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL);
 | 
			
		||||
        } else if (exception instanceof DataAccessException) {
 | 
			
		||||
            String errorType = exception.getClass().getSimpleName();
 | 
			
		||||
            if (!logControllerErrorStackTrace) { // not to log the error twice
 | 
			
		||||
                log.warn("Database error: {} - {}", errorType, ExceptionUtils.getRootCauseMessage(exception));
 | 
			
		||||
            }
 | 
			
		||||
            return new ThingsboardException("Database error", ThingsboardErrorCode.GENERAL);
 | 
			
		||||
        }
 | 
			
		||||
        return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
 | 
			
		||||
@ -38,7 +38,7 @@ public interface AuditLogService {
 | 
			
		||||
 | 
			
		||||
    PageData<AuditLog> findAuditLogsByTenantId(TenantId tenantId, List<ActionType> actionTypes, TimePageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    <E extends HasName, I extends EntityId> ListenableFuture<List<Void>> logEntityAction(
 | 
			
		||||
    <E extends HasName, I extends EntityId> ListenableFuture<Void> logEntityAction(
 | 
			
		||||
            TenantId tenantId,
 | 
			
		||||
            CustomerId customerId,
 | 
			
		||||
            UserId userId,
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,6 @@ package org.thingsboard.server.dao.alarm;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AlarmCommentId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AlarmId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
@ -29,13 +28,10 @@ import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
public interface AlarmCommentDao extends Dao<AlarmComment> {
 | 
			
		||||
 | 
			
		||||
    AlarmComment createAlarmComment(TenantId tenantId, AlarmComment alarmComment);
 | 
			
		||||
 | 
			
		||||
    void deleteAlarmComment(TenantId tenantId, AlarmCommentId alarmCommentId);
 | 
			
		||||
 | 
			
		||||
    AlarmComment findAlarmCommentById(TenantId tenantId, UUID key);
 | 
			
		||||
 | 
			
		||||
    PageData<AlarmCommentInfo> findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<AlarmComment> findAlarmCommentByIdAsync(TenantId tenantId, UUID key);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,7 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.alarm;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
@ -33,8 +32,6 @@ import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
 | 
			
		||||
import org.thingsboard.server.dao.service.DataValidator;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@ -89,12 +86,7 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al
 | 
			
		||||
        if (alarmComment.getType() == null) {
 | 
			
		||||
            alarmComment.setType(AlarmCommentType.OTHER);
 | 
			
		||||
        }
 | 
			
		||||
        if (alarmComment.getId() == null) {
 | 
			
		||||
            UUID uuid = Uuids.timeBased();
 | 
			
		||||
            alarmComment.setId(new AlarmCommentId(uuid));
 | 
			
		||||
            alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid));
 | 
			
		||||
        }
 | 
			
		||||
        return alarmCommentDao.createAlarmComment(tenantId, alarmComment);
 | 
			
		||||
        return alarmCommentDao.save(tenantId, alarmComment);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private AlarmComment updateAlarmComment(TenantId tenantId, AlarmComment newAlarmComment) {
 | 
			
		||||
 | 
			
		||||
@ -30,8 +30,6 @@ import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
public interface AuditLogDao extends Dao<AuditLog> {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> saveByTenantId(AuditLog auditLog);
 | 
			
		||||
 | 
			
		||||
    PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    PageData<AuditLog> findAuditLogsByTenantIdAndCustomerId(UUID tenantId, CustomerId customerId, List<ActionType> actionTypes, TimePageLink pageLink);
 | 
			
		||||
 | 
			
		||||
@ -15,11 +15,9 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.audit;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ArrayNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
			
		||||
import com.google.common.collect.Lists;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
@ -34,7 +32,6 @@ import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.ActionStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.ActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.AuditLog;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AuditLogId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
@ -50,11 +47,11 @@ import org.thingsboard.server.dao.audit.sink.AuditLogSink;
 | 
			
		||||
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
 | 
			
		||||
import org.thingsboard.server.dao.entity.EntityService;
 | 
			
		||||
import org.thingsboard.server.dao.service.DataValidator;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaExecutorService;
 | 
			
		||||
 | 
			
		||||
import java.io.PrintWriter;
 | 
			
		||||
import java.io.StringWriter;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.service.Validator.validateEntityId;
 | 
			
		||||
@ -66,7 +63,6 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
 | 
			
		||||
public class AuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
 | 
			
		||||
    private static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
 | 
			
		||||
    private static final int INSERTS_PER_ENTRY = 3;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private AuditLogLevelFilter auditLogLevelFilter;
 | 
			
		||||
@ -80,6 +76,9 @@ public class AuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private AuditLogSink auditLogSink;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private JpaExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private DataValidator<AuditLog> auditLogValidator;
 | 
			
		||||
 | 
			
		||||
@ -115,7 +114,7 @@ public class AuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <E extends HasName, I extends EntityId> ListenableFuture<List<Void>>
 | 
			
		||||
    public <E extends HasName, I extends EntityId> ListenableFuture<Void>
 | 
			
		||||
    logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity,
 | 
			
		||||
                    ActionType actionType, Exception e, Object... additionalInfo) {
 | 
			
		||||
        if (canLog(entityId.getEntityType(), actionType)) {
 | 
			
		||||
@ -370,9 +369,6 @@ public class AuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
                                         ActionStatus actionStatus,
 | 
			
		||||
                                         String actionFailureDetails) {
 | 
			
		||||
        AuditLog result = new AuditLog();
 | 
			
		||||
        UUID id = Uuids.timeBased();
 | 
			
		||||
        result.setId(new AuditLogId(id));
 | 
			
		||||
        result.setCreatedTime(Uuids.unixTimestamp(id));
 | 
			
		||||
        result.setTenantId(tenantId);
 | 
			
		||||
        result.setEntityId(entityId);
 | 
			
		||||
        result.setEntityName(entityName);
 | 
			
		||||
@ -386,7 +382,7 @@ public class AuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<List<Void>> logAction(TenantId tenantId,
 | 
			
		||||
    private ListenableFuture<Void> logAction(TenantId tenantId,
 | 
			
		||||
                                             EntityId entityId,
 | 
			
		||||
                                             String entityName,
 | 
			
		||||
                                             CustomerId customerId,
 | 
			
		||||
@ -408,12 +404,12 @@ public class AuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
                return Futures.immediateFailedFuture(e);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
 | 
			
		||||
        futures.add(auditLogDao.saveByTenantId(auditLogEntry));
 | 
			
		||||
 | 
			
		||||
        auditLogSink.logAction(auditLogEntry);
 | 
			
		||||
 | 
			
		||||
        return Futures.allAsList(futures);
 | 
			
		||||
        return executor.submit(() -> {
 | 
			
		||||
            AuditLog auditLog = auditLogDao.save(tenantId, auditLogEntry);
 | 
			
		||||
            auditLogSink.logAction(auditLog);
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -55,7 +55,7 @@ public class DummyAuditLogServiceImpl implements AuditLogService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <E extends HasName, I extends EntityId> ListenableFuture<List<Void>> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) {
 | 
			
		||||
    public <E extends HasName, I extends EntityId> ListenableFuture<Void> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -34,14 +34,18 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.http.HttpMethod;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.AuditLog;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.PostConstruct;
 | 
			
		||||
import javax.annotation.PreDestroy;
 | 
			
		||||
import java.time.LocalDateTime;
 | 
			
		||||
import java.time.format.DateTimeFormatter;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@ConditionalOnProperty(prefix = "audit-log.sink", value = "type", havingValue = "elasticsearch")
 | 
			
		||||
@ -68,6 +72,7 @@ public class ElasticsearchAuditLogSink implements AuditLogSink {
 | 
			
		||||
    private String dateFormat;
 | 
			
		||||
 | 
			
		||||
    private RestClient restClient;
 | 
			
		||||
    private ExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
@ -87,14 +92,32 @@ public class ElasticsearchAuditLogSink implements AuditLogSink {
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            this.restClient = builder.build();
 | 
			
		||||
            this.executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("elasticsearch-audit-log"));
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("Sink init failed!", e);
 | 
			
		||||
            throw new RuntimeException(e.getMessage(), e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
    private void destroy() {
 | 
			
		||||
        if (executor != null) {
 | 
			
		||||
            executor.shutdownNow();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void logAction(AuditLog auditLogEntry) {
 | 
			
		||||
        executor.execute(() -> {
 | 
			
		||||
            try {
 | 
			
		||||
                doLogAction(auditLogEntry);
 | 
			
		||||
            } catch (Exception e) {
 | 
			
		||||
                log.error("Failed to log action", e);
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void doLogAction(AuditLog auditLogEntry) {
 | 
			
		||||
        String jsonContent = createElasticJsonRecord(auditLogEntry);
 | 
			
		||||
 | 
			
		||||
        HttpEntity entity = new NStringEntity(
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,8 @@ import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.model.BaseEntity;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
import javax.persistence.EntityManager;
 | 
			
		||||
import javax.persistence.PersistenceContext;
 | 
			
		||||
import java.util.Collection;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
@ -45,9 +47,6 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
 | 
			
		||||
 | 
			
		||||
    protected abstract JpaRepository<E, UUID> getRepository();
 | 
			
		||||
 | 
			
		||||
    protected void setSearchText(E entity) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    @Transactional
 | 
			
		||||
    public D save(TenantId tenantId, D domain) {
 | 
			
		||||
@ -58,17 +57,21 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
 | 
			
		||||
            log.error("Can't create entity for domain object {}", domain, e);
 | 
			
		||||
            throw new IllegalArgumentException("Can't create entity for domain object {" + domain + "}", e);
 | 
			
		||||
        }
 | 
			
		||||
        setSearchText(entity);
 | 
			
		||||
        log.debug("Saving entity {}", entity);
 | 
			
		||||
        if (entity.getUuid() == null) {
 | 
			
		||||
        boolean isNew = entity.getUuid() == null;
 | 
			
		||||
        if (isNew) {
 | 
			
		||||
            UUID uuid = Uuids.timeBased();
 | 
			
		||||
            entity.setUuid(uuid);
 | 
			
		||||
            entity.setCreatedTime(Uuids.unixTimestamp(uuid));
 | 
			
		||||
        }
 | 
			
		||||
        entity = getRepository().save(entity);
 | 
			
		||||
        entity = doSave(entity, isNew);
 | 
			
		||||
        return DaoUtil.getData(entity);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected E doSave(E entity, boolean isNew) {
 | 
			
		||||
        return getRepository().save(entity);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    @Transactional
 | 
			
		||||
    public D saveAndFlush(TenantId tenantId, D domain) {
 | 
			
		||||
@ -121,4 +124,5 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
 | 
			
		||||
        List<E> entities = Lists.newArrayList(getRepository().findAll());
 | 
			
		||||
        return DaoUtil.convertDataList(entities);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,43 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2023 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.sql;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.dao.model.BaseEntity;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
import javax.persistence.EntityManager;
 | 
			
		||||
import javax.persistence.PersistenceContext;
 | 
			
		||||
 | 
			
		||||
@SqlDao
 | 
			
		||||
public abstract class JpaPartitionedAbstractDao<E extends BaseEntity<D>, D> extends JpaAbstractDao<E, D> {
 | 
			
		||||
 | 
			
		||||
    @PersistenceContext
 | 
			
		||||
    private EntityManager entityManager;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected E doSave(E entity, boolean isNew) {
 | 
			
		||||
        createPartition(entity);
 | 
			
		||||
        if (isNew) {
 | 
			
		||||
            entityManager.persist(entity);
 | 
			
		||||
        } else {
 | 
			
		||||
            entity = entityManager.merge(entity);
 | 
			
		||||
        }
 | 
			
		||||
        return entity;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public abstract void createPartition(E entity);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -24,7 +24,6 @@ import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AlarmCommentId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AlarmId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
@ -32,7 +31,7 @@ import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmCommentDao;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.AlarmCommentEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
@ -45,7 +44,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.ALARM_COMMENT_TABL
 | 
			
		||||
@Component
 | 
			
		||||
@SqlDao
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, AlarmComment> implements AlarmCommentDao {
 | 
			
		||||
public class JpaAlarmCommentDao extends JpaPartitionedAbstractDao<AlarmCommentEntity, AlarmComment> implements AlarmCommentDao {
 | 
			
		||||
    private final SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
    @Value("${sql.alarm_comments.partition_size:168}")
 | 
			
		||||
    private int partitionSizeInHours;
 | 
			
		||||
@ -53,20 +52,6 @@ public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, Alarm
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private AlarmCommentRepository alarmCommentRepository;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public AlarmComment createAlarmComment(TenantId tenantId, AlarmComment alarmComment){
 | 
			
		||||
        log.trace("Saving entity {}", alarmComment);
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, alarmComment.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        AlarmCommentEntity saved = alarmCommentRepository.save(new AlarmCommentEntity(alarmComment));
 | 
			
		||||
        return DaoUtil.getData(saved);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteAlarmComment(TenantId tenantId, AlarmCommentId alarmCommentId){
 | 
			
		||||
        log.trace("Try to delete entity alarm comment by id using [{}]", alarmCommentId);
 | 
			
		||||
        alarmCommentRepository.deleteById(alarmCommentId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<AlarmCommentInfo> findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink) {
 | 
			
		||||
        log.trace("Try to find alarm comments by alarm id using [{}]", id);
 | 
			
		||||
@ -86,6 +71,11 @@ public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, Alarm
 | 
			
		||||
        return findByIdAsync(tenantId, key);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createPartition(AlarmCommentEntity entity) {
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected Class<AlarmCommentEntity> getEntityClass() {
 | 
			
		||||
        return AlarmCommentEntity.class;
 | 
			
		||||
 | 
			
		||||
@ -15,8 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.sql.audit;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
@ -25,10 +23,8 @@ import org.springframework.jdbc.core.JdbcTemplate;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.ActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.AuditLog;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AuditLogId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
@ -36,12 +32,11 @@ import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.audit.AuditLogDao;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.AuditLogEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
@ -49,7 +44,7 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
@SqlDao
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
 | 
			
		||||
public class JpaAuditLogDao extends JpaPartitionedAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
 | 
			
		||||
 | 
			
		||||
    private final AuditLogRepository auditLogRepository;
 | 
			
		||||
    private final SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
@ -72,25 +67,6 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
 | 
			
		||||
        return auditLogRepository;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> saveByTenantId(AuditLog auditLog) {
 | 
			
		||||
        return service.submit(() -> {
 | 
			
		||||
            save(auditLog.getTenantId(), auditLog);
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public AuditLog save(TenantId tenantId, AuditLog auditLog) {
 | 
			
		||||
        if (auditLog.getId() == null) {
 | 
			
		||||
            UUID uuid = Uuids.timeBased();
 | 
			
		||||
            auditLog.setId(new AuditLogId(uuid));
 | 
			
		||||
            auditLog.setCreatedTime(Uuids.unixTimestamp(uuid));
 | 
			
		||||
        }
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        return super.save(tenantId, auditLog);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink) {
 | 
			
		||||
        return DaoUtil.toPageData(
 | 
			
		||||
@ -182,4 +158,9 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
 | 
			
		||||
        jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createPartition(AuditLogEntity entity) {
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -35,7 +35,7 @@ import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
 | 
			
		||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
 | 
			
		||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
 | 
			
		||||
@ -47,7 +47,6 @@ import javax.annotation.PreDestroy;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Comparator;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.function.Function;
 | 
			
		||||
@ -58,7 +57,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
 | 
			
		||||
@SqlDao
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEvent> implements EdgeEventDao {
 | 
			
		||||
public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao<EdgeEventEntity, EdgeEvent> implements EdgeEventDao {
 | 
			
		||||
 | 
			
		||||
    private final UUID systemTenantId = NULL_UUID;
 | 
			
		||||
 | 
			
		||||
@ -151,8 +150,9 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEve
 | 
			
		||||
        if (StringUtils.isEmpty(edgeEvent.getUid())) {
 | 
			
		||||
            edgeEvent.setUid(edgeEvent.getId().toString());
 | 
			
		||||
        }
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(TABLE_NAME, edgeEvent.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        return save(new EdgeEventEntity(edgeEvent));
 | 
			
		||||
        EdgeEventEntity entity = new EdgeEventEntity(edgeEvent);
 | 
			
		||||
        createPartition(entity);
 | 
			
		||||
        return save(entity);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> save(EdgeEventEntity entity) {
 | 
			
		||||
@ -227,4 +227,10 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEve
 | 
			
		||||
    private void callMigrationFunction(long startTime, long endTime, long partitionSIzeInMs) {
 | 
			
		||||
        jdbcTemplate.update("CALL migrate_edge_event(?, ?, ?)", startTime, endTime, partitionSIzeInMs);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createPartition(EdgeEventEntity entity) {
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,8 +15,6 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.sql.notification;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.google.common.base.Strings;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
@ -34,7 +32,7 @@ import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.NotificationEntity;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
@ -44,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 | 
			
		||||
@Component
 | 
			
		||||
@SqlDao
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notification> implements NotificationDao {
 | 
			
		||||
public class JpaNotificationDao extends JpaPartitionedAbstractDao<NotificationEntity, Notification> implements NotificationDao {
 | 
			
		||||
 | 
			
		||||
    private final NotificationRepository notificationRepository;
 | 
			
		||||
    private final SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
@ -52,18 +50,6 @@ public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notif
 | 
			
		||||
    @Value("${sql.notifications.partition_size:168}")
 | 
			
		||||
    private int partitionSizeInHours;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Notification save(TenantId tenantId, Notification notification) {
 | 
			
		||||
        if (notification.getId() == null) {
 | 
			
		||||
            UUID uuid = Uuids.timeBased();
 | 
			
		||||
            notification.setId(new NotificationId(uuid));
 | 
			
		||||
            notification.setCreatedTime(Uuids.unixTimestamp(uuid));
 | 
			
		||||
            partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME,
 | 
			
		||||
                    notification.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        }
 | 
			
		||||
        return super.save(tenantId, notification);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<Notification> findUnreadByRecipientIdAndPageLink(TenantId tenantId, UserId recipientId, PageLink pageLink) {
 | 
			
		||||
        return DaoUtil.toPageData(notificationRepository.findByRecipientIdAndStatusNot(recipientId.getId(), NotificationStatus.READ,
 | 
			
		||||
@ -114,6 +100,12 @@ public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notif
 | 
			
		||||
        notificationRepository.deleteByRecipientId(recipientId.getId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void createPartition(NotificationEntity entity) {
 | 
			
		||||
        partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME,
 | 
			
		||||
                entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected Class<NotificationEntity> getEntityClass() {
 | 
			
		||||
        return NotificationEntity.class;
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.springframework.transaction.annotation.Transactional;
 | 
			
		||||
import org.thingsboard.server.common.data.OtaPackageInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.OtaPackageId;
 | 
			
		||||
@ -58,6 +59,7 @@ public class JpaOtaPackageInfoDao extends JpaAbstractDao<OtaPackageInfoEntity, O
 | 
			
		||||
        return DaoUtil.getData(otaPackageInfoRepository.findOtaPackageInfoById(id));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Transactional
 | 
			
		||||
    @Override
 | 
			
		||||
    public OtaPackageInfo save(TenantId tenantId, OtaPackageInfo otaPackageInfo) {
 | 
			
		||||
        OtaPackageInfo savedOtaPackage = super.save(tenantId, otaPackageInfo);
 | 
			
		||||
 | 
			
		||||
@ -85,6 +85,6 @@ public class JpaAlarmCommentDaoTest extends AbstractJpaDaoTest {
 | 
			
		||||
        alarmComment.setUserId(new UserId(userId));
 | 
			
		||||
        alarmComment.setType(type);
 | 
			
		||||
        alarmComment.setComment(JacksonUtil.newObjectNode().put("text", RandomStringUtils.randomAlphanumeric(10)));
 | 
			
		||||
        alarmCommentDao.createAlarmComment(TenantId.fromUUID(UUID.randomUUID()), alarmComment);
 | 
			
		||||
        alarmCommentDao.save(TenantId.fromUUID(UUID.randomUUID()), alarmComment);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user