Refactor TbRocksDb usage

This commit is contained in:
ViacheslavKlimov 2025-02-28 17:22:53 +02:00
parent 43f9083f70
commit faff0ea8c2
4 changed files with 35 additions and 87 deletions

View File

@ -15,22 +15,29 @@
*/
package org.thingsboard.server.service.cf;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.rocksdb.Options;
import org.rocksdb.WriteOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.utils.TbRocksDb;
import org.thingsboard.server.edqs.util.TbRocksDb;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='in-memory'")
public class CfRocksDb extends TbRocksDb {
public CfRocksDb(@Value("${queue.calculated_fields.rocks_db_path:${user.home}/.rocksdb/cf_states}") String path) throws Exception {
public CfRocksDb(@Value("${queue.calculated_fields.rocks_db_path:${user.home}/.rocksdb/cf_states}") String path) {
super(path, new Options().setCreateIfMissing(true), new WriteOptions().setSync(true));
}
@PostConstruct
@Override
public void init() {
super.init();
}
@PreDestroy
@Override
public void close() {

View File

@ -1,71 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.utils;
import lombok.SneakyThrows;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.BiConsumer;
public class TbRocksDb {
protected final String path;
private final WriteOptions writeOptions;
protected final RocksDB db;
static {
RocksDB.loadLibrary();
}
public TbRocksDb(String path, Options dbOptions, WriteOptions writeOptions) throws Exception {
this.path = path;
this.writeOptions = writeOptions;
Files.createDirectories(Path.of(path).getParent());
this.db = RocksDB.open(dbOptions, path);
}
@SneakyThrows
public void put(String key, byte[] value) {
db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value);
}
public void forEach(BiConsumer<String, byte[]> processor) {
try (RocksIterator iterator = db.newIterator()) {
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
String key = new String(iterator.key(), StandardCharsets.UTF_8);
processor.accept(key, iterator.value());
}
}
}
@SneakyThrows
public void delete(String key) {
db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8));
}
public void close() {
if (db != null) {
db.close();
}
}
}

View File

@ -19,6 +19,7 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import org.rocksdb.Options;
import org.rocksdb.WriteOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent;
@ -34,16 +35,18 @@ public class EdqsRocksDb extends TbRocksDb {
private boolean isNew;
public EdqsRocksDb(@Value("${queue.edqs.local.rocksdb_path:${user.home}/.rocksdb/edqs}") String path) {
super(path, new Options().setCreateIfMissing(true));
super(path, new Options().setCreateIfMissing(true), new WriteOptions());
}
@PostConstruct
@Override
public void init() {
isNew = !Files.exists(Path.of(path));
super.init();
}
@PreDestroy
@Override
public void close() {
super.close();
}

View File

@ -15,35 +15,43 @@
*/
package org.thingsboard.server.edqs.util;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteOptions;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.BiConsumer;
@RequiredArgsConstructor
public class TbRocksDb {
protected final String path;
private final Options options;
private RocksDB db;
private final Options dbOptions;
private final WriteOptions writeOptions;
protected RocksDB db;
static {
RocksDB.loadLibrary();
}
@SneakyThrows
public void init() {
db = RocksDB.open(options, path);
public TbRocksDb(String path, Options dbOptions, WriteOptions writeOptions) {
this.path = path;
this.dbOptions = dbOptions;
this.writeOptions = writeOptions;
}
public void put(String key, byte[] value) throws RocksDBException {
db.put(key.getBytes(StandardCharsets.UTF_8), value);
@SneakyThrows
public void init() {
Files.createDirectories(Path.of(path).getParent());
db = RocksDB.open(dbOptions, path);
}
@SneakyThrows
public void put(String key, byte[] value) {
db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value);
}
public void forEach(BiConsumer<String, byte[]> processor) {
@ -55,8 +63,9 @@ public class TbRocksDb {
}
}
public void delete(String key) throws RocksDBException {
db.delete(key.getBytes(StandardCharsets.UTF_8));
@SneakyThrows
public void delete(String key) {
db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8));
}
public void close() {