Merge branch 'hotfix/3.6.2' into fix/actors-init
# Conflicts: # common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java
This commit is contained in:
commit
1f50a8c3ad
@ -16,14 +16,13 @@
|
||||
package org.thingsboard.server.controller;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.security.core.context.SecurityContextHolder;
|
||||
@ -376,9 +375,14 @@ public abstract class BaseController {
|
||||
return new ThingsboardException("Unable to send mail: " + exception.getMessage(), ThingsboardErrorCode.GENERAL);
|
||||
} else if (exception instanceof AsyncRequestTimeoutException) {
|
||||
return new ThingsboardException("Request timeout", ThingsboardErrorCode.GENERAL);
|
||||
} else {
|
||||
return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL);
|
||||
} else if (exception instanceof DataAccessException) {
|
||||
String errorType = exception.getClass().getSimpleName();
|
||||
if (!logControllerErrorStackTrace) { // not to log the error twice
|
||||
log.warn("Database error: {} - {}", errorType, ExceptionUtils.getRootCauseMessage(exception));
|
||||
}
|
||||
return new ThingsboardException("Database error", ThingsboardErrorCode.GENERAL);
|
||||
}
|
||||
return new ThingsboardException(exception.getMessage(), exception, ThingsboardErrorCode.GENERAL);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
@ -37,15 +36,16 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.state.DefaultDeviceStateService;
|
||||
@ -154,7 +154,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
||||
if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
|
||||
entitySubscriptions.values().removeIf(sub ->
|
||||
!partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()).isMyPartition());
|
||||
!partitionService.isMyPartition(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ public interface AuditLogService {
|
||||
|
||||
PageData<AuditLog> findAuditLogsByTenantId(TenantId tenantId, List<ActionType> actionTypes, TimePageLink pageLink);
|
||||
|
||||
<E extends HasName, I extends EntityId> ListenableFuture<List<Void>> logEntityAction(
|
||||
<E extends HasName, I extends EntityId> ListenableFuture<Void> logEntityAction(
|
||||
TenantId tenantId,
|
||||
CustomerId customerId,
|
||||
UserId userId,
|
||||
|
||||
@ -259,7 +259,12 @@ public class HashPartitionService implements PartitionService {
|
||||
|
||||
@Override
|
||||
public boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
|
||||
try {
|
||||
return resolve(serviceType, tenantId, entityId).isMyPartition();
|
||||
} catch (TenantNotFoundException e) {
|
||||
log.warn("Tenant with id {} not found", tenantId, new RuntimeException("stacktrace"));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
|
||||
@ -379,7 +384,12 @@ public class HashPartitionService implements PartitionService {
|
||||
.map(tpi -> tpi.getPartition().orElse(-1).toString()).sorted()
|
||||
.collect(Collectors.joining(", ")) + "]")
|
||||
.collect(Collectors.joining(System.lineSeparator())));
|
||||
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
|
||||
PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap);
|
||||
try {
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to publish partition change event {}", event, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -41,7 +41,11 @@ public abstract class TbApplicationEventListener<T extends TbApplicationEvent> i
|
||||
seqNumberLock.unlock();
|
||||
}
|
||||
if (validUpdate && filterTbApplicationEvent(event)) {
|
||||
try {
|
||||
onTbApplicationEvent(event);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to handle partition change event: {}", event, e);
|
||||
}
|
||||
} else {
|
||||
log.info("Application event ignored due to invalid sequence number ({} > {}). Event: {}", lastProcessedSequenceNumber, event.getSequenceNumber(), event);
|
||||
}
|
||||
|
||||
@ -160,7 +160,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
|
||||
@Override
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||
for (TenantId tenantId : vcService.getActiveRepositoryTenants()) {
|
||||
if (!partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId).isMyPartition()) {
|
||||
if (!partitionService.isMyPartition(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId)) {
|
||||
var lock = getRepoLock(tenantId);
|
||||
lock.lock();
|
||||
try {
|
||||
|
||||
@ -18,7 +18,6 @@ package org.thingsboard.server.dao.alarm;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
|
||||
import org.thingsboard.server.common.data.id.AlarmCommentId;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
@ -29,13 +28,10 @@ import java.util.UUID;
|
||||
|
||||
public interface AlarmCommentDao extends Dao<AlarmComment> {
|
||||
|
||||
AlarmComment createAlarmComment(TenantId tenantId, AlarmComment alarmComment);
|
||||
|
||||
void deleteAlarmComment(TenantId tenantId, AlarmCommentId alarmCommentId);
|
||||
|
||||
AlarmComment findAlarmCommentById(TenantId tenantId, UUID key);
|
||||
|
||||
PageData<AlarmCommentInfo> findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink);
|
||||
|
||||
ListenableFuture<AlarmComment> findAlarmCommentByIdAsync(TenantId tenantId, UUID key);
|
||||
|
||||
}
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.alarm;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
@ -33,8 +32,6 @@ import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
|
||||
import org.thingsboard.server.dao.service.DataValidator;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.thingsboard.server.dao.service.Validator.validateId;
|
||||
|
||||
@Service
|
||||
@ -89,12 +86,7 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al
|
||||
if (alarmComment.getType() == null) {
|
||||
alarmComment.setType(AlarmCommentType.OTHER);
|
||||
}
|
||||
if (alarmComment.getId() == null) {
|
||||
UUID uuid = Uuids.timeBased();
|
||||
alarmComment.setId(new AlarmCommentId(uuid));
|
||||
alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid));
|
||||
}
|
||||
return alarmCommentDao.createAlarmComment(tenantId, alarmComment);
|
||||
return alarmCommentDao.save(tenantId, alarmComment);
|
||||
}
|
||||
|
||||
private AlarmComment updateAlarmComment(TenantId tenantId, AlarmComment newAlarmComment) {
|
||||
|
||||
@ -30,8 +30,6 @@ import java.util.UUID;
|
||||
|
||||
public interface AuditLogDao extends Dao<AuditLog> {
|
||||
|
||||
ListenableFuture<Void> saveByTenantId(AuditLog auditLog);
|
||||
|
||||
PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink);
|
||||
|
||||
PageData<AuditLog> findAuditLogsByTenantIdAndCustomerId(UUID tenantId, CustomerId customerId, List<ActionType> actionTypes, TimePageLink pageLink);
|
||||
|
||||
@ -15,11 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.audit;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -34,7 +32,6 @@ import org.thingsboard.server.common.data.alarm.AlarmComment;
|
||||
import org.thingsboard.server.common.data.audit.ActionStatus;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.audit.AuditLog;
|
||||
import org.thingsboard.server.common.data.id.AuditLogId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -50,11 +47,11 @@ import org.thingsboard.server.dao.audit.sink.AuditLogSink;
|
||||
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
|
||||
import org.thingsboard.server.dao.entity.EntityService;
|
||||
import org.thingsboard.server.dao.service.DataValidator;
|
||||
import org.thingsboard.server.dao.sql.JpaExecutorService;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.dao.service.Validator.validateEntityId;
|
||||
@ -66,7 +63,6 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
|
||||
public class AuditLogServiceImpl implements AuditLogService {
|
||||
|
||||
private static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
|
||||
private static final int INSERTS_PER_ENTRY = 3;
|
||||
|
||||
@Autowired
|
||||
private AuditLogLevelFilter auditLogLevelFilter;
|
||||
@ -80,6 +76,9 @@ public class AuditLogServiceImpl implements AuditLogService {
|
||||
@Autowired
|
||||
private AuditLogSink auditLogSink;
|
||||
|
||||
@Autowired
|
||||
private JpaExecutorService executor;
|
||||
|
||||
@Autowired
|
||||
private DataValidator<AuditLog> auditLogValidator;
|
||||
|
||||
@ -115,7 +114,7 @@ public class AuditLogServiceImpl implements AuditLogService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E extends HasName, I extends EntityId> ListenableFuture<List<Void>>
|
||||
public <E extends HasName, I extends EntityId> ListenableFuture<Void>
|
||||
logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity,
|
||||
ActionType actionType, Exception e, Object... additionalInfo) {
|
||||
if (canLog(entityId.getEntityType(), actionType)) {
|
||||
@ -370,9 +369,6 @@ public class AuditLogServiceImpl implements AuditLogService {
|
||||
ActionStatus actionStatus,
|
||||
String actionFailureDetails) {
|
||||
AuditLog result = new AuditLog();
|
||||
UUID id = Uuids.timeBased();
|
||||
result.setId(new AuditLogId(id));
|
||||
result.setCreatedTime(Uuids.unixTimestamp(id));
|
||||
result.setTenantId(tenantId);
|
||||
result.setEntityId(entityId);
|
||||
result.setEntityName(entityName);
|
||||
@ -386,7 +382,7 @@ public class AuditLogServiceImpl implements AuditLogService {
|
||||
return result;
|
||||
}
|
||||
|
||||
private ListenableFuture<List<Void>> logAction(TenantId tenantId,
|
||||
private ListenableFuture<Void> logAction(TenantId tenantId,
|
||||
EntityId entityId,
|
||||
String entityName,
|
||||
CustomerId customerId,
|
||||
@ -408,12 +404,12 @@ public class AuditLogServiceImpl implements AuditLogService {
|
||||
return Futures.immediateFailedFuture(e);
|
||||
}
|
||||
}
|
||||
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
|
||||
futures.add(auditLogDao.saveByTenantId(auditLogEntry));
|
||||
|
||||
auditLogSink.logAction(auditLogEntry);
|
||||
|
||||
return Futures.allAsList(futures);
|
||||
return executor.submit(() -> {
|
||||
AuditLog auditLog = auditLogDao.save(tenantId, auditLogEntry);
|
||||
auditLogSink.logAction(auditLog);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -55,7 +55,7 @@ public class DummyAuditLogServiceImpl implements AuditLogService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <E extends HasName, I extends EntityId> ListenableFuture<List<Void>> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) {
|
||||
public <E extends HasName, I extends EntityId> ListenableFuture<Void> logEntityAction(TenantId tenantId, CustomerId customerId, UserId userId, String userName, I entityId, E entity, ActionType actionType, Exception e, Object... additionalInfo) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@ -34,14 +34,18 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.audit.AuditLog;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "audit-log.sink", value = "type", havingValue = "elasticsearch")
|
||||
@ -68,6 +72,7 @@ public class ElasticsearchAuditLogSink implements AuditLogSink {
|
||||
private String dateFormat;
|
||||
|
||||
private RestClient restClient;
|
||||
private ExecutorService executor;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
@ -87,14 +92,32 @@ public class ElasticsearchAuditLogSink implements AuditLogSink {
|
||||
}
|
||||
|
||||
this.restClient = builder.build();
|
||||
this.executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("elasticsearch-audit-log"));
|
||||
} catch (Exception e) {
|
||||
log.error("Sink init failed!", e);
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void destroy() {
|
||||
if (executor != null) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logAction(AuditLog auditLogEntry) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
doLogAction(auditLogEntry);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to log action", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void doLogAction(AuditLog auditLogEntry) {
|
||||
String jsonContent = createElasticJsonRecord(auditLogEntry);
|
||||
|
||||
HttpEntity entity = new NStringEntity(
|
||||
|
||||
@ -27,6 +27,8 @@ import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.model.BaseEntity;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -45,9 +47,6 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
|
||||
|
||||
protected abstract JpaRepository<E, UUID> getRepository();
|
||||
|
||||
protected void setSearchText(E entity) {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public D save(TenantId tenantId, D domain) {
|
||||
@ -58,17 +57,21 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
|
||||
log.error("Can't create entity for domain object {}", domain, e);
|
||||
throw new IllegalArgumentException("Can't create entity for domain object {" + domain + "}", e);
|
||||
}
|
||||
setSearchText(entity);
|
||||
log.debug("Saving entity {}", entity);
|
||||
if (entity.getUuid() == null) {
|
||||
boolean isNew = entity.getUuid() == null;
|
||||
if (isNew) {
|
||||
UUID uuid = Uuids.timeBased();
|
||||
entity.setUuid(uuid);
|
||||
entity.setCreatedTime(Uuids.unixTimestamp(uuid));
|
||||
}
|
||||
entity = getRepository().save(entity);
|
||||
entity = doSave(entity, isNew);
|
||||
return DaoUtil.getData(entity);
|
||||
}
|
||||
|
||||
protected E doSave(E entity, boolean isNew) {
|
||||
return getRepository().save(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public D saveAndFlush(TenantId tenantId, D domain) {
|
||||
@ -121,4 +124,5 @@ public abstract class JpaAbstractDao<E extends BaseEntity<D>, D>
|
||||
List<E> entities = Lists.newArrayList(getRepository().findAll());
|
||||
return DaoUtil.convertDataList(entities);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql;
|
||||
|
||||
import org.thingsboard.server.dao.model.BaseEntity;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
|
||||
@SqlDao
|
||||
public abstract class JpaPartitionedAbstractDao<E extends BaseEntity<D>, D> extends JpaAbstractDao<E, D> {
|
||||
|
||||
@PersistenceContext
|
||||
private EntityManager entityManager;
|
||||
|
||||
@Override
|
||||
protected E doSave(E entity, boolean isNew) {
|
||||
createPartition(entity);
|
||||
if (isNew) {
|
||||
entityManager.persist(entity);
|
||||
} else {
|
||||
entity = entityManager.merge(entity);
|
||||
}
|
||||
return entity;
|
||||
}
|
||||
|
||||
public abstract void createPartition(E entity);
|
||||
|
||||
}
|
||||
@ -24,7 +24,6 @@ import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
|
||||
import org.thingsboard.server.common.data.id.AlarmCommentId;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
@ -32,7 +31,7 @@ import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.alarm.AlarmCommentDao;
|
||||
import org.thingsboard.server.dao.model.sql.AlarmCommentEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
@ -45,7 +44,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.ALARM_COMMENT_TABL
|
||||
@Component
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, AlarmComment> implements AlarmCommentDao {
|
||||
public class JpaAlarmCommentDao extends JpaPartitionedAbstractDao<AlarmCommentEntity, AlarmComment> implements AlarmCommentDao {
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
@Value("${sql.alarm_comments.partition_size:168}")
|
||||
private int partitionSizeInHours;
|
||||
@ -54,21 +53,7 @@ public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, Alarm
|
||||
private AlarmCommentRepository alarmCommentRepository;
|
||||
|
||||
@Override
|
||||
public AlarmComment createAlarmComment(TenantId tenantId, AlarmComment alarmComment){
|
||||
log.trace("Saving entity {}", alarmComment);
|
||||
partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, alarmComment.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
AlarmCommentEntity saved = alarmCommentRepository.save(new AlarmCommentEntity(alarmComment));
|
||||
return DaoUtil.getData(saved);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAlarmComment(TenantId tenantId, AlarmCommentId alarmCommentId){
|
||||
log.trace("Try to delete entity alarm comment by id using [{}]", alarmCommentId);
|
||||
alarmCommentRepository.deleteById(alarmCommentId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<AlarmCommentInfo> findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink){
|
||||
public PageData<AlarmCommentInfo> findAlarmComments(TenantId tenantId, AlarmId id, PageLink pageLink) {
|
||||
log.trace("Try to find alarm comments by alarm id using [{}]", id);
|
||||
return DaoUtil.toPageData(
|
||||
alarmCommentRepository.findAllByAlarmId(id.getId(), DaoUtil.toPageable(pageLink)));
|
||||
@ -86,6 +71,11 @@ public class JpaAlarmCommentDao extends JpaAbstractDao<AlarmCommentEntity, Alarm
|
||||
return findByIdAsync(tenantId, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createPartition(AlarmCommentEntity entity) {
|
||||
partitioningRepository.createPartitionIfNotExists(ALARM_COMMENT_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<AlarmCommentEntity> getEntityClass() {
|
||||
return AlarmCommentEntity.class;
|
||||
|
||||
@ -15,8 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.audit;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@ -25,10 +23,8 @@ import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.audit.AuditLog;
|
||||
import org.thingsboard.server.common.data.id.AuditLogId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
@ -36,12 +32,11 @@ import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.audit.AuditLogDao;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.model.sql.AuditLogEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -49,7 +44,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
|
||||
public class JpaAuditLogDao extends JpaPartitionedAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
|
||||
|
||||
private final AuditLogRepository auditLogRepository;
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
@ -72,25 +67,6 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
|
||||
return auditLogRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> saveByTenantId(AuditLog auditLog) {
|
||||
return service.submit(() -> {
|
||||
save(auditLog.getTenantId(), auditLog);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public AuditLog save(TenantId tenantId, AuditLog auditLog) {
|
||||
if (auditLog.getId() == null) {
|
||||
UUID uuid = Uuids.timeBased();
|
||||
auditLog.setId(new AuditLogId(uuid));
|
||||
auditLog.setCreatedTime(Uuids.unixTimestamp(uuid));
|
||||
}
|
||||
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
return super.save(tenantId, auditLog);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink) {
|
||||
return DaoUtil.toPageData(
|
||||
@ -182,4 +158,9 @@ public class JpaAuditLogDao extends JpaAbstractDao<AuditLogEntity, AuditLog> imp
|
||||
jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createPartition(AuditLogEntity entity) {
|
||||
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -35,7 +35,7 @@ import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
|
||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
||||
@ -47,7 +47,6 @@ import javax.annotation.PreDestroy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
@ -58,7 +57,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEvent> implements EdgeEventDao {
|
||||
public class JpaBaseEdgeEventDao extends JpaPartitionedAbstractDao<EdgeEventEntity, EdgeEvent> implements EdgeEventDao {
|
||||
|
||||
private final UUID systemTenantId = NULL_UUID;
|
||||
|
||||
@ -151,8 +150,9 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEve
|
||||
if (StringUtils.isEmpty(edgeEvent.getUid())) {
|
||||
edgeEvent.setUid(edgeEvent.getId().toString());
|
||||
}
|
||||
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, edgeEvent.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
return save(new EdgeEventEntity(edgeEvent));
|
||||
EdgeEventEntity entity = new EdgeEventEntity(edgeEvent);
|
||||
createPartition(entity);
|
||||
return save(entity);
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> save(EdgeEventEntity entity) {
|
||||
@ -227,4 +227,10 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao<EdgeEventEntity, EdgeEve
|
||||
private void callMigrationFunction(long startTime, long endTime, long partitionSIzeInMs) {
|
||||
jdbcTemplate.update("CALL migrate_edge_event(?, ?, ?)", startTime, endTime, partitionSIzeInMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createPartition(EdgeEventEntity entity) {
|
||||
partitioningRepository.createPartitionIfNotExists(TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,8 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.notification;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.google.common.base.Strings;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
@ -34,7 +32,7 @@ import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.model.sql.NotificationEntity;
|
||||
import org.thingsboard.server.dao.notification.NotificationDao;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDao;
|
||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
@ -44,7 +42,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Component
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notification> implements NotificationDao {
|
||||
public class JpaNotificationDao extends JpaPartitionedAbstractDao<NotificationEntity, Notification> implements NotificationDao {
|
||||
|
||||
private final NotificationRepository notificationRepository;
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
@ -52,18 +50,6 @@ public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notif
|
||||
@Value("${sql.notifications.partition_size:168}")
|
||||
private int partitionSizeInHours;
|
||||
|
||||
@Override
|
||||
public Notification save(TenantId tenantId, Notification notification) {
|
||||
if (notification.getId() == null) {
|
||||
UUID uuid = Uuids.timeBased();
|
||||
notification.setId(new NotificationId(uuid));
|
||||
notification.setCreatedTime(Uuids.unixTimestamp(uuid));
|
||||
partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME,
|
||||
notification.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
return super.save(tenantId, notification);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Notification> findUnreadByRecipientIdAndPageLink(TenantId tenantId, UserId recipientId, PageLink pageLink) {
|
||||
return DaoUtil.toPageData(notificationRepository.findByRecipientIdAndStatusNot(recipientId.getId(), NotificationStatus.READ,
|
||||
@ -114,6 +100,12 @@ public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notif
|
||||
notificationRepository.deleteByRecipientId(recipientId.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createPartition(NotificationEntity entity) {
|
||||
partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME,
|
||||
entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<NotificationEntity> getEntityClass() {
|
||||
return NotificationEntity.class;
|
||||
|
||||
@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.common.data.OtaPackageInfo;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
@ -58,6 +59,7 @@ public class JpaOtaPackageInfoDao extends JpaAbstractDao<OtaPackageInfoEntity, O
|
||||
return DaoUtil.getData(otaPackageInfoRepository.findOtaPackageInfoById(id));
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public OtaPackageInfo save(TenantId tenantId, OtaPackageInfo otaPackageInfo) {
|
||||
OtaPackageInfo savedOtaPackage = super.save(tenantId, otaPackageInfo);
|
||||
|
||||
@ -85,6 +85,6 @@ public class JpaAlarmCommentDaoTest extends AbstractJpaDaoTest {
|
||||
alarmComment.setUserId(new UserId(userId));
|
||||
alarmComment.setType(type);
|
||||
alarmComment.setComment(JacksonUtil.newObjectNode().put("text", RandomStringUtils.randomAlphanumeric(10)));
|
||||
alarmCommentDao.createAlarmComment(TenantId.fromUUID(UUID.randomUUID()), alarmComment);
|
||||
alarmCommentDao.save(TenantId.fromUUID(UUID.randomUUID()), alarmComment);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user