diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4c70768c87..5015ff090d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -194,6 +194,11 @@ sql: ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}" # Specify thread pool size for FIXED executor service type ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:200}" + # Specify batch size for persisting attribute updates + attributes: + batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}" + batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}" + stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:1000}" # Actor system parameters actors: diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java b/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java new file mode 100644 index 0000000000..da20ff8c21 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql; + +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Component +public class ScheduledLogExecutorComponent { + + private ScheduledExecutorService schedulerLogExecutor; + + @PostConstruct + public void init() { + schedulerLogExecutor = Executors.newSingleThreadScheduledExecutor(); + } + + @PreDestroy + public void stop() { + if (schedulerLogExecutor != null) { + schedulerLogExecutor.shutdownNow(); + } + } + + public void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + schedulerLogExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java new file mode 100644 index 0000000000..7630ccdaad --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java @@ -0,0 +1,114 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +@Slf4j +public class TbSqlBlockingQueue implements TbSqlQueue { + + private final BlockingQueue> queue = new LinkedBlockingQueue<>(); + private final AtomicInteger addedCount = new AtomicInteger(); + private final AtomicInteger savedCount = new AtomicInteger(); + private final AtomicInteger failedCount = new AtomicInteger(); + private final TbSqlBlockingQueueParams params; + + private ExecutorService executor; + private ScheduledLogExecutorComponent logExecutor; + + public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) { + this.params = params; + } + + @Override + public void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction) { + this.logExecutor = logExecutor; + executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> { + String logName = params.getLogName(); + int batchSize = params.getBatchSize(); + long maxDelay = params.getMaxDelay(); + List> entities = new ArrayList<>(batchSize); + while (!Thread.interrupted()) { + try { + long currentTs = System.currentTimeMillis(); + TbSqlQueueElement attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS); + if (attr == null) { + continue; + } else { + entities.add(attr); + } + queue.drainTo(entities, batchSize - 1); + boolean fullPack = entities.size() == batchSize; + log.debug("[{}] Going to save {} entities", logName, entities.size()); + saveFunction.accept(entities.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList())); + entities.forEach(v -> v.getFuture().set(null)); + savedCount.addAndGet(entities.size()); + if (!fullPack) { + long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs); + if (remainingDelay > 0) { + Thread.sleep(remainingDelay); + } + } + } catch (Exception e) { + failedCount.addAndGet(entities.size()); + entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e)); + if (e instanceof InterruptedException) { + log.info("[{}] Queue polling was interrupted", logName); + break; + } else { + log.error("[{}] Failed to save {} entities", logName, entities.size(), e); + } + } finally { + entities.clear(); + } + } + }); + + logExecutor.scheduleAtFixedRate(() -> { + log.info("Attributes queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", + queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0)); + }, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS); + } + + @Override + public void destroy() { + if (executor != null) { + executor.shutdownNow(); + } + } + + @Override + public ListenableFuture add(E element) { + SettableFuture future = SettableFuture.create(); + queue.add(new TbSqlQueueElement<>(future, element)); + addedCount.incrementAndGet(); + return future; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java new file mode 100644 index 0000000000..9afe19a61a --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueueParams.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql; + +import lombok.Builder; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Data +@Builder +public class TbSqlBlockingQueueParams { + + private final String logName; + private final int batchSize; + private final long maxDelay; + private final long statsPrintIntervalMs; +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java new file mode 100644 index 0000000000..27c6bc9509 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueue.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.List; +import java.util.function.Consumer; + +public interface TbSqlQueue { + + void init(ScheduledLogExecutorComponent logExecutor, Consumer> saveFunction); + + void destroy(); + + ListenableFuture add(E element); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java new file mode 100644 index 0000000000..7c95d768e7 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql; + +import com.google.common.util.concurrent.SettableFuture; +import lombok.Getter; + +public final class TbSqlQueueElement { + @Getter + private final SettableFuture future; + @Getter + private final E entity; + + public TbSqlQueueElement(SettableFuture future, E entity) { + this.future = future; + this.entity = entity; + } +} + + diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java index e50a60aed9..89843048a0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java @@ -15,23 +15,51 @@ */ package org.thingsboard.server.dao.sql.attributes; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.jpa.repository.Modifying; +import org.springframework.jdbc.core.BatchPreparedStatementSetter; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import org.thingsboard.server.dao.util.SqlDao; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; @SqlDao @Repository +@Slf4j public abstract class AttributeKvInsertRepository { + private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, last_update_ts = ? " + + "WHERE entity_type = ? and entity_id = ? and attribute_type =? and attribute_key = ?;"; + + private static final String INSERT_OR_UPDATE = + "INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " + + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) " + + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, last_update_ts = ?;"; + protected static final String BOOL_V = "bool_v"; protected static final String STR_V = "str_v"; protected static final String LONG_V = "long_v"; protected static final String DBL_V = "dbl_v"; + @Autowired + protected JdbcTemplate jdbcTemplate; + + @Autowired + private TransactionTemplate transactionTemplate; + @PersistenceContext protected EntityManager entityManager; @@ -99,4 +127,106 @@ public abstract class AttributeKvInsertRepository { .setParameter("last_update_ts", entity.getLastUpdateTs()) .executeUpdate(); } + + protected void saveOrUpdate(List entities) { + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + @Override + protected void doInTransactionWithoutResult(TransactionStatus status) { + int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ps.setString(1, entities.get(i).getStrValue()); + + if (entities.get(i).getLongValue() != null) { + ps.setLong(2, entities.get(i).getLongValue()); + } else { + ps.setNull(2, Types.BIGINT); + } + + if (entities.get(i).getDoubleValue() != null) { + ps.setDouble(3, entities.get(i).getDoubleValue()); + } else { + ps.setNull(3, Types.DOUBLE); + } + + if (entities.get(i).getBooleanValue() != null) { + ps.setBoolean(4, entities.get(i).getBooleanValue()); + } else { + ps.setNull(4, Types.BOOLEAN); + } + + ps.setLong(5, entities.get(i).getLastUpdateTs()); + ps.setString(6, entities.get(i).getId().getEntityType().name()); + ps.setString(7, entities.get(i).getId().getEntityId()); + ps.setString(8, entities.get(i).getId().getAttributeType()); + ps.setString(9, entities.get(i).getId().getAttributeKey()); + } + + @Override + public int getBatchSize() { + return entities.size(); + } + }); + + int updatedCount = 0; + for (int i = 0; i < result.length; i++) { + if (result[i] == 0) { + updatedCount++; + } + } + + List insertEntities = new ArrayList<>(updatedCount); + for (int i = 0; i < result.length; i++) { + if (result[i] == 0) { + insertEntities.add(entities.get(i)); + } + } + + jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps, int i) throws SQLException { + ps.setString(1, insertEntities.get(i).getId().getEntityType().name()); + ps.setString(2, insertEntities.get(i).getId().getEntityId()); + ps.setString(3, insertEntities.get(i).getId().getAttributeType()); + ps.setString(4, insertEntities.get(i).getId().getAttributeKey()); + ps.setString(5, insertEntities.get(i).getStrValue()); + ps.setString(10, insertEntities.get(i).getStrValue()); + + if (insertEntities.get(i).getLongValue() != null) { + ps.setLong(6, insertEntities.get(i).getLongValue()); + ps.setLong(11, insertEntities.get(i).getLongValue()); + } else { + ps.setNull(6, Types.BIGINT); + ps.setNull(11, Types.BIGINT); + } + + if (insertEntities.get(i).getDoubleValue() != null) { + ps.setDouble(7, insertEntities.get(i).getDoubleValue()); + ps.setDouble(12, insertEntities.get(i).getDoubleValue()); + } else { + ps.setNull(7, Types.DOUBLE); + ps.setNull(12, Types.DOUBLE); + } + + if (insertEntities.get(i).getBooleanValue() != null) { + ps.setBoolean(8, insertEntities.get(i).getBooleanValue()); + ps.setBoolean(13, insertEntities.get(i).getBooleanValue()); + } else { + ps.setNull(8, Types.BOOLEAN); + ps.setNull(13, Types.BOOLEAN); + } + + ps.setLong(9, insertEntities.get(i).getLastUpdateTs()); + ps.setLong(14, insertEntities.get(i).getLastUpdateTs()); + } + + @Override + public int getBatchSize() { + return insertEntities.size(); + } + }); + } + }); + } + } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/HsqlAttributesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/HsqlAttributesInsertRepository.java index f2343cd7fe..425832f344 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/HsqlAttributesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/HsqlAttributesInsertRepository.java @@ -21,6 +21,9 @@ import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import org.thingsboard.server.dao.util.HsqlDao; import org.thingsboard.server.dao.util.SqlDao; +import java.sql.Types; +import java.util.List; + @SqlDao @HsqlDao @Repository @@ -37,6 +40,17 @@ public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository private static final String INSERT_LONG_STATEMENT = getInsertOrUpdateString(LONG_V, ON_LONG_VALUE_UPDATE_SET_NULLS); private static final String INSERT_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE = + "MERGE INTO attribute_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?) " + + "A (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " + + "ON (attribute_kv.entity_type=A.entity_type " + + "AND attribute_kv.entity_id=A.entity_id " + + "AND attribute_kv.attribute_type=A.attribute_type " + + "AND attribute_kv.attribute_key=A.attribute_key) " + + "WHEN MATCHED THEN UPDATE SET attribute_kv.str_v = A.str_v, attribute_kv.long_v = A.long_v, attribute_kv.dbl_v = A.dbl_v, attribute_kv.bool_v = A.bool_v, attribute_kv.last_update_ts = A.last_update_ts " + + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " + + "VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.last_update_ts)"; + @Override public void saveOrUpdate(AttributeKvEntity entity) { processSaveOrUpdate(entity, INSERT_BOOL_STATEMENT, INSERT_STR_STATEMENT, INSERT_LONG_STATEMENT, INSERT_DBL_STATEMENT); @@ -45,4 +59,38 @@ public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository private static String getInsertOrUpdateString(String value, String nullValues) { return "MERGE INTO attribute_kv USING(VALUES :entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) A (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) ON (attribute_kv.entity_type=A.entity_type AND attribute_kv.entity_id=A.entity_id AND attribute_kv.attribute_type=A.attribute_type AND attribute_kv.attribute_key=A.attribute_key) WHEN MATCHED THEN UPDATE SET attribute_kv." + value + " = A." + value + ", attribute_kv.last_update_ts = A.last_update_ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A." + value + ", A.last_update_ts)"; } + + + @Override + protected void saveOrUpdate(List entities) { + entities.forEach(entity -> { + jdbcTemplate.update(INSERT_OR_UPDATE, ps -> { + ps.setString(1, entity.getId().getEntityType().name()); + ps.setString(2, entity.getId().getEntityId()); + ps.setString(3, entity.getId().getAttributeType()); + ps.setString(4, entity.getId().getAttributeKey()); + ps.setString(5, entity.getStrValue()); + + if (entity.getLongValue() != null) { + ps.setLong(6, entity.getLongValue()); + } else { + ps.setNull(6, Types.BIGINT); + } + + if (entity.getDoubleValue() != null) { + ps.setDouble(7, entity.getDoubleValue()); + } else { + ps.setNull(7, Types.DOUBLE); + } + + if (entity.getBooleanValue() != null) { + ps.setBoolean(8, entity.getBooleanValue()); + } else { + ps.setNull(8, Types.BOOLEAN); + } + + ps.setLong(9, entity.getLastUpdateTs()); + }); + }); + } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index b8ad6f8271..2b8c7e5ab2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.UUIDConverter; import org.thingsboard.server.common.data.id.EntityId; @@ -30,8 +31,13 @@ import org.thingsboard.server.dao.attributes.AttributesDao; import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; +import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; +import org.thingsboard.server.dao.sql.TbSqlBlockingQueue; +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.util.SqlDao; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -44,12 +50,45 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; @SqlDao public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService implements AttributesDao { + @Autowired + ScheduledLogExecutorComponent logExecutor; + @Autowired private AttributeKvRepository attributeKvRepository; @Autowired private AttributeKvInsertRepository attributeKvInsertRepository; + @Value("${sql.attributes.batch_size:1000}") + private int batchSize; + + @Value("${sql.attributes.batch_max_delay:100}") + private long maxDelay; + + @Value("${sql.attributes.stats_print_interval_ms:1000}") + private long statsPrintIntervalMs; + + private TbSqlBlockingQueue queue; + + @PostConstruct + private void init() { + TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder() + .logName("Attributes") + .batchSize(batchSize) + .maxDelay(maxDelay) + .statsPrintIntervalMs(statsPrintIntervalMs) + .build(); + queue = new TbSqlBlockingQueue<>(params); + queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v)); + } + + @PreDestroy + private void destroy() { + if (queue != null) { + queue.destroy(); + } + } + @Override public ListenableFuture> find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey) { AttributeKvCompositeKey compositeKey = @@ -89,12 +128,12 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl entity.setDoubleValue(attribute.getDoubleValue().orElse(null)); entity.setLongValue(attribute.getLongValue().orElse(null)); entity.setBooleanValue(attribute.getBooleanValue().orElse(null)); - return service.submit(() -> { - attributeKvInsertRepository.saveOrUpdate(entity); - return null; - }); + return addToQueue(entity); } + private ListenableFuture addToQueue(AttributeKvEntity entity) { + return queue.add(entity); + } @Override public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys) {