Batch attribute updates

* added batch support to AttributeKvInsertRepository

* added logs for testing

* added batch support to AttributeKvInsertRepository

* added logs for testing

* Code review part 1

* Improvements

* Batch Update Implementation

* added realization SaveOrUpdate batch to Hsql and refactored

* refactored
This commit is contained in:
Andrew Shvayka 2019-11-22 15:11:32 +02:00 committed by GitHub
parent b67170f072
commit 1d29a8d9a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 480 additions and 4 deletions

View File

@ -194,6 +194,11 @@ sql:
ts_inserts_executor_type: "${SQL_TS_INSERTS_EXECUTOR_TYPE:fixed}"
# Specify thread pool size for FIXED executor service type
ts_inserts_fixed_thread_pool_size: "${SQL_TS_INSERTS_FIXED_THREAD_POOL_SIZE:200}"
# Specify batch size for persisting attribute updates
attributes:
batch_size: "${SQL_ATTRIBUTES_BATCH_SIZE:10000}"
batch_max_delay: "${SQL_ATTRIBUTES_BATCH_MAX_DELAY_MS:100}"
stats_print_interval_ms: "${SQL_ATTRIBUTES_BATCH_STATS_PRINT_MS:1000}"
# Actor system parameters
actors:

View File

@ -0,0 +1,46 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
public class ScheduledLogExecutorComponent {
private ScheduledExecutorService schedulerLogExecutor;
@PostConstruct
public void init() {
schedulerLogExecutor = Executors.newSingleThreadScheduledExecutor();
}
@PreDestroy
public void stop() {
if (schedulerLogExecutor != null) {
schedulerLogExecutor.shutdownNow();
}
}
public void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
schedulerLogExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
}
}

View File

@ -0,0 +1,114 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@Slf4j
public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue<>();
private final AtomicInteger addedCount = new AtomicInteger();
private final AtomicInteger savedCount = new AtomicInteger();
private final AtomicInteger failedCount = new AtomicInteger();
private final TbSqlBlockingQueueParams params;
private ExecutorService executor;
private ScheduledLogExecutorComponent logExecutor;
public TbSqlBlockingQueue(TbSqlBlockingQueueParams params) {
this.params = params;
}
@Override
public void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction) {
this.logExecutor = logExecutor;
executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String logName = params.getLogName();
int batchSize = params.getBatchSize();
long maxDelay = params.getMaxDelay();
List<TbSqlQueueElement<E>> entities = new ArrayList<>(batchSize);
while (!Thread.interrupted()) {
try {
long currentTs = System.currentTimeMillis();
TbSqlQueueElement<E> attr = queue.poll(maxDelay, TimeUnit.MILLISECONDS);
if (attr == null) {
continue;
} else {
entities.add(attr);
}
queue.drainTo(entities, batchSize - 1);
boolean fullPack = entities.size() == batchSize;
log.debug("[{}] Going to save {} entities", logName, entities.size());
saveFunction.accept(entities.stream().map(TbSqlQueueElement::getEntity).collect(Collectors.toList()));
entities.forEach(v -> v.getFuture().set(null));
savedCount.addAndGet(entities.size());
if (!fullPack) {
long remainingDelay = maxDelay - (System.currentTimeMillis() - currentTs);
if (remainingDelay > 0) {
Thread.sleep(remainingDelay);
}
}
} catch (Exception e) {
failedCount.addAndGet(entities.size());
entities.forEach(entityFutureWrapper -> entityFutureWrapper.getFuture().setException(e));
if (e instanceof InterruptedException) {
log.info("[{}] Queue polling was interrupted", logName);
break;
} else {
log.error("[{}] Failed to save {} entities", logName, entities.size(), e);
}
} finally {
entities.clear();
}
}
});
logExecutor.scheduleAtFixedRate(() -> {
log.info("Attributes queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]",
queue.size(), addedCount.getAndSet(0), savedCount.getAndSet(0), failedCount.getAndSet(0));
}, params.getStatsPrintIntervalMs(), params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
}
@Override
public void destroy() {
if (executor != null) {
executor.shutdownNow();
}
}
@Override
public ListenableFuture<Void> add(E element) {
SettableFuture<Void> future = SettableFuture.create();
queue.add(new TbSqlQueueElement<>(future, element));
addedCount.incrementAndGet();
return future;
}
}

View File

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
@Builder
public class TbSqlBlockingQueueParams {
private final String logName;
private final int batchSize;
private final long maxDelay;
private final long statsPrintIntervalMs;
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.function.Consumer;
public interface TbSqlQueue<E> {
void init(ScheduledLogExecutorComponent logExecutor, Consumer<List<E>> saveFunction);
void destroy();
ListenableFuture<Void> add(E element);
}

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Getter;
public final class TbSqlQueueElement<E> {
@Getter
private final SettableFuture<Void> future;
@Getter
private final E entity;
public TbSqlQueueElement(SettableFuture<Void> future, E entity) {
this.future = future;
this.entity = entity;
}
}

View File

@ -15,23 +15,51 @@
*/
package org.thingsboard.server.dao.sql.attributes;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.util.SqlDao;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@SqlDao
@Repository
@Slf4j
public abstract class AttributeKvInsertRepository {
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, last_update_ts = ? " +
"WHERE entity_type = ? and entity_id = ? and attribute_type =? and attribute_key = ?;";
private static final String INSERT_OR_UPDATE =
"INSERT INTO attribute_kv (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT (entity_type, entity_id, attribute_type, attribute_key) " +
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, last_update_ts = ?;";
protected static final String BOOL_V = "bool_v";
protected static final String STR_V = "str_v";
protected static final String LONG_V = "long_v";
protected static final String DBL_V = "dbl_v";
@Autowired
protected JdbcTemplate jdbcTemplate;
@Autowired
private TransactionTemplate transactionTemplate;
@PersistenceContext
protected EntityManager entityManager;
@ -99,4 +127,106 @@ public abstract class AttributeKvInsertRepository {
.setParameter("last_update_ts", entity.getLastUpdateTs())
.executeUpdate();
}
protected void saveOrUpdate(List<AttributeKvEntity> entities) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, entities.get(i).getStrValue());
if (entities.get(i).getLongValue() != null) {
ps.setLong(2, entities.get(i).getLongValue());
} else {
ps.setNull(2, Types.BIGINT);
}
if (entities.get(i).getDoubleValue() != null) {
ps.setDouble(3, entities.get(i).getDoubleValue());
} else {
ps.setNull(3, Types.DOUBLE);
}
if (entities.get(i).getBooleanValue() != null) {
ps.setBoolean(4, entities.get(i).getBooleanValue());
} else {
ps.setNull(4, Types.BOOLEAN);
}
ps.setLong(5, entities.get(i).getLastUpdateTs());
ps.setString(6, entities.get(i).getId().getEntityType().name());
ps.setString(7, entities.get(i).getId().getEntityId());
ps.setString(8, entities.get(i).getId().getAttributeType());
ps.setString(9, entities.get(i).getId().getAttributeKey());
}
@Override
public int getBatchSize() {
return entities.size();
}
});
int updatedCount = 0;
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
updatedCount++;
}
}
List<AttributeKvEntity> insertEntities = new ArrayList<>(updatedCount);
for (int i = 0; i < result.length; i++) {
if (result[i] == 0) {
insertEntities.add(entities.get(i));
}
}
jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, insertEntities.get(i).getId().getEntityType().name());
ps.setString(2, insertEntities.get(i).getId().getEntityId());
ps.setString(3, insertEntities.get(i).getId().getAttributeType());
ps.setString(4, insertEntities.get(i).getId().getAttributeKey());
ps.setString(5, insertEntities.get(i).getStrValue());
ps.setString(10, insertEntities.get(i).getStrValue());
if (insertEntities.get(i).getLongValue() != null) {
ps.setLong(6, insertEntities.get(i).getLongValue());
ps.setLong(11, insertEntities.get(i).getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
ps.setNull(11, Types.BIGINT);
}
if (insertEntities.get(i).getDoubleValue() != null) {
ps.setDouble(7, insertEntities.get(i).getDoubleValue());
ps.setDouble(12, insertEntities.get(i).getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
ps.setNull(12, Types.DOUBLE);
}
if (insertEntities.get(i).getBooleanValue() != null) {
ps.setBoolean(8, insertEntities.get(i).getBooleanValue());
ps.setBoolean(13, insertEntities.get(i).getBooleanValue());
} else {
ps.setNull(8, Types.BOOLEAN);
ps.setNull(13, Types.BOOLEAN);
}
ps.setLong(9, insertEntities.get(i).getLastUpdateTs());
ps.setLong(14, insertEntities.get(i).getLastUpdateTs());
}
@Override
public int getBatchSize() {
return insertEntities.size();
}
});
}
});
}
}

View File

@ -21,6 +21,9 @@ import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlDao;
import java.sql.Types;
import java.util.List;
@SqlDao
@HsqlDao
@Repository
@ -37,6 +40,17 @@ public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository
private static final String INSERT_LONG_STATEMENT = getInsertOrUpdateString(LONG_V, ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE =
"MERGE INTO attribute_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"A (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " +
"ON (attribute_kv.entity_type=A.entity_type " +
"AND attribute_kv.entity_id=A.entity_id " +
"AND attribute_kv.attribute_type=A.attribute_type " +
"AND attribute_kv.attribute_key=A.attribute_key) " +
"WHEN MATCHED THEN UPDATE SET attribute_kv.str_v = A.str_v, attribute_kv.long_v = A.long_v, attribute_kv.dbl_v = A.dbl_v, attribute_kv.bool_v = A.bool_v, attribute_kv.last_update_ts = A.last_update_ts " +
"WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, last_update_ts) " +
"VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.last_update_ts)";
@Override
public void saveOrUpdate(AttributeKvEntity entity) {
processSaveOrUpdate(entity, INSERT_BOOL_STATEMENT, INSERT_STR_STATEMENT, INSERT_LONG_STATEMENT, INSERT_DBL_STATEMENT);
@ -45,4 +59,38 @@ public class HsqlAttributesInsertRepository extends AttributeKvInsertRepository
private static String getInsertOrUpdateString(String value, String nullValues) {
return "MERGE INTO attribute_kv USING(VALUES :entity_type, :entity_id, :attribute_type, :attribute_key, :" + value + ", :last_update_ts) A (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) ON (attribute_kv.entity_type=A.entity_type AND attribute_kv.entity_id=A.entity_id AND attribute_kv.attribute_type=A.attribute_type AND attribute_kv.attribute_key=A.attribute_key) WHEN MATCHED THEN UPDATE SET attribute_kv." + value + " = A." + value + ", attribute_kv.last_update_ts = A.last_update_ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, attribute_type, attribute_key, " + value + ", last_update_ts) VALUES (A.entity_type, A.entity_id, A.attribute_type, A.attribute_key, A." + value + ", A.last_update_ts)";
}
@Override
protected void saveOrUpdate(List<AttributeKvEntity> entities) {
entities.forEach(entity -> {
jdbcTemplate.update(INSERT_OR_UPDATE, ps -> {
ps.setString(1, entity.getId().getEntityType().name());
ps.setString(2, entity.getId().getEntityId());
ps.setString(3, entity.getId().getAttributeType());
ps.setString(4, entity.getId().getAttributeKey());
ps.setString(5, entity.getStrValue());
if (entity.getLongValue() != null) {
ps.setLong(6, entity.getLongValue());
} else {
ps.setNull(6, Types.BIGINT);
}
if (entity.getDoubleValue() != null) {
ps.setDouble(7, entity.getDoubleValue());
} else {
ps.setNull(7, Types.DOUBLE);
}
if (entity.getBooleanValue() != null) {
ps.setBoolean(8, entity.getBooleanValue());
} else {
ps.setNull(8, Types.BOOLEAN);
}
ps.setLong(9, entity.getLastUpdateTs());
});
});
}
}

View File

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.UUIDConverter;
import org.thingsboard.server.common.data.id.EntityId;
@ -30,8 +31,13 @@ import org.thingsboard.server.dao.attributes.AttributesDao;
import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueue;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@ -44,12 +50,45 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID;
@SqlDao
public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService implements AttributesDao {
@Autowired
ScheduledLogExecutorComponent logExecutor;
@Autowired
private AttributeKvRepository attributeKvRepository;
@Autowired
private AttributeKvInsertRepository attributeKvInsertRepository;
@Value("${sql.attributes.batch_size:1000}")
private int batchSize;
@Value("${sql.attributes.batch_max_delay:100}")
private long maxDelay;
@Value("${sql.attributes.stats_print_interval_ms:1000}")
private long statsPrintIntervalMs;
private TbSqlBlockingQueue<AttributeKvEntity> queue;
@PostConstruct
private void init() {
TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
.logName("Attributes")
.batchSize(batchSize)
.maxDelay(maxDelay)
.statsPrintIntervalMs(statsPrintIntervalMs)
.build();
queue = new TbSqlBlockingQueue<>(params);
queue.init(logExecutor, v -> attributeKvInsertRepository.saveOrUpdate(v));
}
@PreDestroy
private void destroy() {
if (queue != null) {
queue.destroy();
}
}
@Override
public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String attributeType, String attributeKey) {
AttributeKvCompositeKey compositeKey =
@ -89,12 +128,12 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
entity.setDoubleValue(attribute.getDoubleValue().orElse(null));
entity.setLongValue(attribute.getLongValue().orElse(null));
entity.setBooleanValue(attribute.getBooleanValue().orElse(null));
return service.submit(() -> {
attributeKvInsertRepository.saveOrUpdate(entity);
return null;
});
return addToQueue(entity);
}
private ListenableFuture<Void> addToQueue(AttributeKvEntity entity) {
return queue.add(entity);
}
@Override
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List<String> keys) {