Separate dao implementations for dedicated datasource
This commit is contained in:
parent
868b480ff9
commit
f53f5e4fbb
@ -17,6 +17,7 @@ package org.thingsboard.server.controller;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
@ -38,7 +39,7 @@ import org.thingsboard.server.common.data.security.Authority;
|
||||
import org.thingsboard.server.dao.audit.AuditLogDao;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.service.ttl.AuditLogsCleanUpService;
|
||||
|
||||
import java.text.ParseException;
|
||||
@ -64,8 +65,9 @@ public class AuditLogControllerTest extends AbstractControllerTest {
|
||||
|
||||
@Autowired
|
||||
private AuditLogDao auditLogDao;
|
||||
@Getter
|
||||
@SpyBean
|
||||
private DedicatedSqlPartitioningRepository partitioningRepository;
|
||||
private SqlPartitioningRepository partitioningRepository;
|
||||
@SpyBean
|
||||
private AuditLogsCleanUpService auditLogsCleanUpService;
|
||||
|
||||
@ -183,12 +185,12 @@ public class AuditLogControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void whenSavingNewAuditLog_thenCheckAndCreatePartitionIfNotExists() throws ParseException {
|
||||
long entityTs = ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2024-01-01T01:43:11Z").getTime();
|
||||
reset(partitioningRepository);
|
||||
reset(getPartitioningRepository());
|
||||
AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId, entityTs);
|
||||
verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs));
|
||||
verify(getPartitioningRepository()).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs));
|
||||
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
|
||||
assertThat(partitions).contains(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs));
|
||||
List<Long> partitions = getPartitioningRepository().fetchPartitions("audit_log");
|
||||
assertThat(partitions).contains(getPartitioningRepository().calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -197,15 +199,15 @@ public class AuditLogControllerTest extends AbstractControllerTest {
|
||||
final long oldAuditLogTs = ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2020-10-01T00:00:00Z").getTime();
|
||||
final long currentTimeMillis = oldAuditLogTs + TimeUnit.SECONDS.toMillis(auditLogsTtlInSec) * 2;
|
||||
|
||||
final long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs);
|
||||
partitioningRepository.createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs);
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
|
||||
final long partitionStartTs = getPartitioningRepository().calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs);
|
||||
getPartitioningRepository().createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs);
|
||||
List<Long> partitions = getPartitioningRepository().fetchPartitions("audit_log");
|
||||
assertThat(partitions).contains(partitionStartTs);
|
||||
|
||||
willReturn(currentTimeMillis).given(auditLogsCleanUpService).getCurrentTimeMillis();
|
||||
auditLogsCleanUpService.cleanUp();
|
||||
|
||||
partitions = partitioningRepository.fetchPartitions("audit_log");
|
||||
partitions = getPartitioningRepository().fetchPartitions("audit_log");
|
||||
assertThat(partitions).as("partitions cleared").doesNotContain(partitionStartTs);
|
||||
assertThat(partitions).as("only newer partitions left").allSatisfy(partitionsStart -> {
|
||||
long partitionEndTs = partitionsStart + partitionDurationInMs;
|
||||
@ -218,17 +220,17 @@ public class AuditLogControllerTest extends AbstractControllerTest {
|
||||
// creating partition bigger than sql.audit_logs.partition_size
|
||||
long entityTs = ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2022-04-29T07:43:11Z").getTime();
|
||||
//the partition 7 days is overlapping default partition size 1 day, use in the far past to not affect other tests
|
||||
partitioningRepository.createPartitionIfNotExists("audit_log", entityTs, TimeUnit.DAYS.toMillis(7));
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
|
||||
getPartitioningRepository().createPartitionIfNotExists("audit_log", entityTs, TimeUnit.DAYS.toMillis(7));
|
||||
List<Long> partitions = getPartitioningRepository().fetchPartitions("audit_log");
|
||||
log.warn("entityTs [{}], fetched partitions {}", entityTs, partitions);
|
||||
assertThat(partitions).contains(ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2022-04-28T00:00:00Z").getTime());
|
||||
partitioningRepository.cleanupPartitionsCache("audit_log", entityTs, 0);
|
||||
getPartitioningRepository().cleanupPartitionsCache("audit_log", entityTs, 0);
|
||||
|
||||
assertDoesNotThrow(() -> {
|
||||
// expecting partition overlap error on partition save
|
||||
createAuditLog(ActionType.LOGIN, tenantAdminUserId, entityTs);
|
||||
});
|
||||
assertThat(partitioningRepository.fetchPartitions("audit_log"))
|
||||
assertThat(getPartitioningRepository().fetchPartitions("audit_log"))
|
||||
.contains(ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.parse("2022-04-28T00:00:00Z").getTime());
|
||||
}
|
||||
|
||||
|
||||
@ -15,8 +15,11 @@
|
||||
*/
|
||||
package org.thingsboard.server.controller;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
|
||||
@DaoSqlTest
|
||||
@TestPropertySource(properties = {
|
||||
@ -25,4 +28,9 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
"spring.datasource.dedicated.driverClassName=${spring.datasource.driverClassName}",
|
||||
})
|
||||
public class AuditLogControllerTest_DedicatedDataSource extends AuditLogControllerTest {
|
||||
|
||||
@Getter
|
||||
@SpyBean
|
||||
private DedicatedSqlPartitioningRepository partitioningRepository;
|
||||
|
||||
}
|
||||
|
||||
@ -13,10 +13,14 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.event;
|
||||
package org.thingsboard.server.dao.config;
|
||||
|
||||
public interface EventCleanupRepository {
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
||||
void cleanupEvents(long eventExpTime, boolean debug);
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@ConditionalOnProperty(value = "spring.datasource.dedicated.enabled", havingValue = "true")
|
||||
public @interface DedicatedDataSource {
|
||||
}
|
||||
@ -17,7 +17,6 @@ package org.thingsboard.server.dao.config;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
|
||||
@ -46,7 +45,7 @@ import java.util.Objects;
|
||||
* - add the package of this JpaRepository to @EnableJpaRepositories in DedicatedJpaDaoConfig
|
||||
* - add the entity class to packages list in dedicatedEntityManagerFactory in DedicatedJpaDaoConfig
|
||||
* */
|
||||
@ConditionalOnProperty(value = "spring.datasource.dedicated.enabled", havingValue = "true")
|
||||
@DedicatedDataSource
|
||||
@Configuration
|
||||
@EnableJpaRepositories(value = {"org.thingsboard.server.dao.sql.event", "org.thingsboard.server.dao.sql.audit"},
|
||||
bootstrapMode = BootstrapMode.LAZY,
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.config;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@ConditionalOnProperty(value = "spring.datasource.dedicated.enabled", havingValue = "false", matchIfMissing = true)
|
||||
public @interface DefaultDataSource {
|
||||
}
|
||||
@ -15,38 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.config;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
||||
import org.springframework.data.repository.config.BootstrapMode;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_JDBC_TEMPLATE;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_MANAGER;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_TEMPLATE;
|
||||
|
||||
@ConditionalOnProperty(value = "spring.datasource.dedicated.enabled", havingValue = "false", matchIfMissing = true)
|
||||
@DefaultDataSource
|
||||
@Configuration
|
||||
@EnableJpaRepositories(value = {"org.thingsboard.server.dao.sql.event", "org.thingsboard.server.dao.sql.audit"}, bootstrapMode = BootstrapMode.LAZY)
|
||||
public class DefaultDedicatedJpaDaoConfig {
|
||||
|
||||
@Bean(DEDICATED_JDBC_TEMPLATE)
|
||||
public JdbcTemplate dedicatedJdbcTemplate(@Qualifier("jdbcTemplate") JdbcTemplate defaultJdbcTemplate) {
|
||||
return defaultJdbcTemplate;
|
||||
}
|
||||
|
||||
@Bean(DEDICATED_TRANSACTION_MANAGER)
|
||||
public JpaTransactionManager dedicatedTransactionManager(@Qualifier("transactionManager") JpaTransactionManager defaultTransactionManager) {
|
||||
return defaultTransactionManager;
|
||||
}
|
||||
|
||||
@Bean(DEDICATED_TRANSACTION_TEMPLATE)
|
||||
public TransactionTemplate dedicatedTransactionTemplate(@Qualifier("transactionTemplate") TransactionTemplate defaultTransactionTemplate) {
|
||||
return defaultTransactionTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.audit;
|
||||
|
||||
import jakarta.persistence.EntityManager;
|
||||
import jakarta.persistence.PersistenceContext;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.common.data.audit.AuditLog;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.dao.config.DedicatedDataSource;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_JDBC_TEMPLATE;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_PERSISTENCE_UNIT;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_MANAGER;
|
||||
|
||||
@DedicatedDataSource
|
||||
@Component
|
||||
@SqlDao
|
||||
public class DedicatedJpaAuditLogDao extends JpaAuditLogDao {
|
||||
|
||||
@Autowired
|
||||
@Qualifier(DEDICATED_JDBC_TEMPLATE)
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
@PersistenceContext(unitName = DEDICATED_PERSISTENCE_UNIT)
|
||||
private EntityManager entityManager;
|
||||
|
||||
public DedicatedJpaAuditLogDao(AuditLogRepository auditLogRepository, DedicatedSqlPartitioningRepository partitioningRepository) {
|
||||
super(auditLogRepository, partitioningRepository);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public AuditLog save(TenantId tenantId, AuditLog domain) {
|
||||
return super.save(tenantId, domain);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public AuditLog saveAndFlush(TenantId tenantId, AuditLog domain) {
|
||||
return super.saveAndFlush(tenantId, domain);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public boolean removeById(TenantId tenantId, UUID id) {
|
||||
return super.removeById(tenantId, id);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public void removeAllByIds(Collection<UUID> ids) {
|
||||
super.removeAllByIds(ids);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EntityManager getEntityManager() {
|
||||
return entityManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JdbcTemplate getJdbcTemplate() {
|
||||
return jdbcTemplate;
|
||||
}
|
||||
|
||||
}
|
||||
@ -15,42 +15,33 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.audit;
|
||||
|
||||
import jakarta.persistence.EntityManager;
|
||||
import jakarta.persistence.PersistenceContext;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.audit.AuditLog;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.audit.AuditLogDao;
|
||||
import org.thingsboard.server.dao.config.DefaultDataSource;
|
||||
import org.thingsboard.server.dao.model.sql.AuditLogEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaPartitionedAbstractDao;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_JDBC_TEMPLATE;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_PERSISTENCE_UNIT;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_MANAGER;
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_TABLE_NAME;
|
||||
|
||||
@DefaultDataSource
|
||||
@Component
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
@ -58,40 +49,11 @@ import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_TABLE_NA
|
||||
public class JpaAuditLogDao extends JpaPartitionedAbstractDao<AuditLogEntity, AuditLog> implements AuditLogDao {
|
||||
|
||||
private final AuditLogRepository auditLogRepository;
|
||||
private final DedicatedSqlPartitioningRepository partitioningRepository;
|
||||
@Autowired
|
||||
@Qualifier(DEDICATED_JDBC_TEMPLATE)
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
@PersistenceContext(unitName = DEDICATED_PERSISTENCE_UNIT)
|
||||
private EntityManager entityManager;
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
|
||||
@Value("${sql.audit_logs.partition_size:168}")
|
||||
private int partitionSizeInHours;
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public AuditLog save(TenantId tenantId, AuditLog domain) {
|
||||
return super.save(tenantId, domain);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public AuditLog saveAndFlush(TenantId tenantId, AuditLog domain) {
|
||||
return super.saveAndFlush(tenantId, domain);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public boolean removeById(TenantId tenantId, UUID id) {
|
||||
return super.removeById(tenantId, id);
|
||||
}
|
||||
|
||||
@Transactional(transactionManager = DEDICATED_TRANSACTION_MANAGER)
|
||||
@Override
|
||||
public void removeAllByIds(Collection<UUID> ids) {
|
||||
super.removeAllByIds(ids);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<AuditLog> findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List<ActionType> actionTypes, TimePageLink pageLink) {
|
||||
return DaoUtil.toPageData(
|
||||
@ -157,19 +119,6 @@ public class JpaAuditLogDao extends JpaPartitionedAbstractDao<AuditLogEntity, Au
|
||||
partitioningRepository.createPartitionIfNotExists(AUDIT_LOG_TABLE_NAME, entity.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EntityManager getEntityManager() {
|
||||
if (entityManager != null) {
|
||||
return entityManager;
|
||||
}
|
||||
return super.getEntityManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JdbcTemplate getJdbcTemplate() {
|
||||
return jdbcTemplate;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<AuditLogEntity> getEntityClass() {
|
||||
return AuditLogEntity.class;
|
||||
|
||||
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.event;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
import org.thingsboard.server.dao.config.DedicatedDataSource;
|
||||
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_JDBC_TEMPLATE;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_TEMPLATE;
|
||||
|
||||
@DedicatedDataSource
|
||||
@Repository
|
||||
public class DedicatedEventInsertRepository extends EventInsertRepository {
|
||||
|
||||
public DedicatedEventInsertRepository(@Qualifier(DEDICATED_JDBC_TEMPLATE) JdbcTemplate jdbcTemplate,
|
||||
@Qualifier(DEDICATED_TRANSACTION_TEMPLATE) TransactionTemplate transactionTemplate) {
|
||||
super(jdbcTemplate, transactionTemplate);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.event;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.config.DedicatedDataSource;
|
||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
@DedicatedDataSource
|
||||
@Component
|
||||
@SqlDao
|
||||
public class DedicatedJpaEventDao extends JpaBaseEventDao {
|
||||
|
||||
public DedicatedJpaEventDao(EventPartitionConfiguration partitionConfiguration,
|
||||
DedicatedSqlPartitioningRepository partitioningRepository,
|
||||
LifecycleEventRepository lcEventRepository,
|
||||
StatisticsEventRepository statsEventRepository,
|
||||
ErrorEventRepository errorEventRepository,
|
||||
DedicatedEventInsertRepository eventInsertRepository,
|
||||
RuleNodeDebugEventRepository ruleNodeDebugEventRepository,
|
||||
RuleChainDebugEventRepository ruleChainDebugEventRepository,
|
||||
ScheduledLogExecutorComponent logExecutor,
|
||||
StatsFactory statsFactory) {
|
||||
super(partitionConfiguration, partitioningRepository, lcEventRepository, statsEventRepository,
|
||||
errorEventRepository, eventInsertRepository, ruleNodeDebugEventRepository,
|
||||
ruleChainDebugEventRepository, logExecutor, statsFactory);
|
||||
}
|
||||
|
||||
}
|
||||
@ -16,9 +16,7 @@
|
||||
package org.thingsboard.server.dao.sql.event;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.Getter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
@ -34,6 +32,7 @@ import org.thingsboard.server.common.data.event.LifecycleEvent;
|
||||
import org.thingsboard.server.common.data.event.RuleChainDebugEvent;
|
||||
import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
|
||||
import org.thingsboard.server.common.data.event.StatisticsEvent;
|
||||
import org.thingsboard.server.dao.config.DefaultDataSource;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
@ -46,12 +45,11 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_JDBC_TEMPLATE;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_TEMPLATE;
|
||||
|
||||
@DefaultDataSource
|
||||
@Repository
|
||||
@Transactional
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
public class EventInsertRepository {
|
||||
|
||||
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
|
||||
@ -60,14 +58,8 @@ public class EventInsertRepository {
|
||||
|
||||
private final Map<EventType, String> insertStmtMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Getter
|
||||
@Autowired
|
||||
@Qualifier(DEDICATED_JDBC_TEMPLATE)
|
||||
protected JdbcTemplate jdbcTemplate;
|
||||
|
||||
@Autowired
|
||||
@Qualifier(DEDICATED_TRANSACTION_TEMPLATE)
|
||||
private TransactionTemplate transactionTemplate;
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final TransactionTemplate transactionTemplate;
|
||||
|
||||
@Value("${sql.remove_null_chars:true}")
|
||||
private boolean removeNullChars;
|
||||
@ -244,4 +236,5 @@ public class EventInsertRepository {
|
||||
}
|
||||
return strValue;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -38,12 +38,13 @@ import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.config.DefaultDataSource;
|
||||
import org.thingsboard.server.dao.event.EventDao;
|
||||
import org.thingsboard.server.dao.model.sql.EventEntity;
|
||||
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
|
||||
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
|
||||
import org.thingsboard.server.dao.util.SqlDao;
|
||||
|
||||
import java.util.Comparator;
|
||||
@ -54,22 +55,19 @@ import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Created by Valerii Sosliuk on 5/3/2017.
|
||||
*/
|
||||
@Slf4j
|
||||
@DefaultDataSource
|
||||
@Component
|
||||
@SqlDao
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class JpaBaseEventDao implements EventDao {
|
||||
|
||||
private final EventPartitionConfiguration partitionConfiguration;
|
||||
private final DedicatedSqlPartitioningRepository partitioningRepository;
|
||||
private final SqlPartitioningRepository partitioningRepository;
|
||||
private final LifecycleEventRepository lcEventRepository;
|
||||
private final StatisticsEventRepository statsEventRepository;
|
||||
private final ErrorEventRepository errorEventRepository;
|
||||
private final EventInsertRepository eventInsertRepository;
|
||||
private final EventCleanupRepository eventCleanupRepository;
|
||||
private final RuleNodeDebugEventRepository ruleNodeDebugEventRepository;
|
||||
private final RuleChainDebugEventRepository ruleChainDebugEventRepository;
|
||||
private final ScheduledLogExecutorComponent logExecutor;
|
||||
@ -377,7 +375,7 @@ public class JpaBaseEventDao implements EventDao {
|
||||
if (regularEventExpTs > 0) {
|
||||
log.info("Going to cleanup regular events with exp time: {}", regularEventExpTs);
|
||||
if (cleanupDb) {
|
||||
eventCleanupRepository.cleanupEvents(regularEventExpTs, false);
|
||||
cleanupEvents(regularEventExpTs, false);
|
||||
} else {
|
||||
cleanupPartitionsCache(regularEventExpTs, false);
|
||||
}
|
||||
@ -385,13 +383,25 @@ public class JpaBaseEventDao implements EventDao {
|
||||
if (debugEventExpTs > 0) {
|
||||
log.info("Going to cleanup debug events with exp time: {}", debugEventExpTs);
|
||||
if (cleanupDb) {
|
||||
eventCleanupRepository.cleanupEvents(debugEventExpTs, true);
|
||||
cleanupEvents(debugEventExpTs, true);
|
||||
} else {
|
||||
cleanupPartitionsCache(debugEventExpTs, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupEvents(long eventExpTime, boolean debug) {
|
||||
for (EventType eventType : EventType.values()) {
|
||||
if (eventType.isDebug() == debug) {
|
||||
cleanupPartitions(eventType, eventExpTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupPartitions(EventType eventType, long eventExpTime) {
|
||||
partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType));
|
||||
}
|
||||
|
||||
private void cleanupPartitionsCache(long expTime, boolean isDebug) {
|
||||
for (EventType eventType : EventType.values()) {
|
||||
if (eventType.isDebug() == isDebug) {
|
||||
|
||||
@ -1,47 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.event;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.thingsboard.server.common.data.event.EventType;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
|
||||
import org.thingsboard.server.dao.sqlts.insert.sql.DedicatedSqlPartitioningRepository;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository {
|
||||
|
||||
private final EventPartitionConfiguration partitionConfiguration;
|
||||
private final DedicatedSqlPartitioningRepository partitioningRepository;
|
||||
|
||||
@Override
|
||||
public void cleanupEvents(long eventExpTime, boolean debug) {
|
||||
for (EventType eventType : EventType.values()) {
|
||||
if (eventType.isDebug() == debug) {
|
||||
cleanupEvents(eventType, eventExpTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupEvents(EventType eventType, long eventExpTime) {
|
||||
partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType));
|
||||
}
|
||||
|
||||
}
|
||||
@ -21,11 +21,13 @@ import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.dao.config.DedicatedDataSource;
|
||||
import org.thingsboard.server.dao.timeseries.SqlPartition;
|
||||
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_JDBC_TEMPLATE;
|
||||
import static org.thingsboard.server.dao.config.DedicatedJpaDaoConfig.DEDICATED_TRANSACTION_MANAGER;
|
||||
|
||||
@DedicatedDataSource
|
||||
@Repository
|
||||
public class DedicatedSqlPartitioningRepository extends SqlPartitioningRepository {
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user