diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java b/application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java index cb6d3f0d6b..f95227bc24 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CfRocksDb.java @@ -15,22 +15,29 @@ */ package org.thingsboard.server.service.cf; +import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.rocksdb.Options; import org.rocksdb.WriteOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; -import org.thingsboard.server.utils.TbRocksDb; +import org.thingsboard.server.edqs.util.TbRocksDb; @Component @ConditionalOnExpression("'${queue.type:null}'=='in-memory'") public class CfRocksDb extends TbRocksDb { - public CfRocksDb(@Value("${queue.calculated_fields.rocks_db_path:${user.home}/.rocksdb/cf_states}") String path) throws Exception { + public CfRocksDb(@Value("${queue.calculated_fields.rocks_db_path:${user.home}/.rocksdb/cf_states}") String path) { super(path, new Options().setCreateIfMissing(true), new WriteOptions().setSync(true)); } + @PostConstruct + @Override + public void init() { + super.init(); + } + @PreDestroy @Override public void close() { diff --git a/application/src/main/java/org/thingsboard/server/utils/TbRocksDb.java b/application/src/main/java/org/thingsboard/server/utils/TbRocksDb.java deleted file mode 100644 index 5c4fe185e5..0000000000 --- a/application/src/main/java/org/thingsboard/server/utils/TbRocksDb.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.utils; - -import lombok.SneakyThrows; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksIterator; -import org.rocksdb.WriteOptions; - -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.function.BiConsumer; - -public class TbRocksDb { - - protected final String path; - private final WriteOptions writeOptions; - protected final RocksDB db; - - static { - RocksDB.loadLibrary(); - } - - public TbRocksDb(String path, Options dbOptions, WriteOptions writeOptions) throws Exception { - this.path = path; - this.writeOptions = writeOptions; - Files.createDirectories(Path.of(path).getParent()); - this.db = RocksDB.open(dbOptions, path); - } - - @SneakyThrows - public void put(String key, byte[] value) { - db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value); - } - - public void forEach(BiConsumer processor) { - try (RocksIterator iterator = db.newIterator()) { - for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) { - String key = new String(iterator.key(), StandardCharsets.UTF_8); - processor.accept(key, iterator.value()); - } - } - } - - @SneakyThrows - public void delete(String key) { - db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8)); - } - - public void close() { - if (db != null) { - db.close(); - } - } - -} \ No newline at end of file diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsRocksDb.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsRocksDb.java index ae1436e6ca..4a991432c7 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsRocksDb.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsRocksDb.java @@ -19,6 +19,7 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Getter; import org.rocksdb.Options; +import org.rocksdb.WriteOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.server.queue.edqs.InMemoryEdqsComponent; @@ -34,16 +35,18 @@ public class EdqsRocksDb extends TbRocksDb { private boolean isNew; public EdqsRocksDb(@Value("${queue.edqs.local.rocksdb_path:${user.home}/.rocksdb/edqs}") String path) { - super(path, new Options().setCreateIfMissing(true)); + super(path, new Options().setCreateIfMissing(true), new WriteOptions()); } @PostConstruct + @Override public void init() { isNew = !Files.exists(Path.of(path)); super.init(); } @PreDestroy + @Override public void close() { super.close(); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/TbRocksDb.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/TbRocksDb.java index cd5ba0c9d9..23f2fa2c9e 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/TbRocksDb.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/TbRocksDb.java @@ -15,35 +15,43 @@ */ package org.thingsboard.server.edqs.util; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.rocksdb.Options; import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.function.BiConsumer; -@RequiredArgsConstructor public class TbRocksDb { protected final String path; - private final Options options; - - private RocksDB db; + private final Options dbOptions; + private final WriteOptions writeOptions; + protected RocksDB db; static { RocksDB.loadLibrary(); } - @SneakyThrows - public void init() { - db = RocksDB.open(options, path); + public TbRocksDb(String path, Options dbOptions, WriteOptions writeOptions) { + this.path = path; + this.dbOptions = dbOptions; + this.writeOptions = writeOptions; } - public void put(String key, byte[] value) throws RocksDBException { - db.put(key.getBytes(StandardCharsets.UTF_8), value); + @SneakyThrows + public void init() { + Files.createDirectories(Path.of(path).getParent()); + db = RocksDB.open(dbOptions, path); + } + + @SneakyThrows + public void put(String key, byte[] value) { + db.put(writeOptions, key.getBytes(StandardCharsets.UTF_8), value); } public void forEach(BiConsumer processor) { @@ -55,8 +63,9 @@ public class TbRocksDb { } } - public void delete(String key) throws RocksDBException { - db.delete(key.getBytes(StandardCharsets.UTF_8)); + @SneakyThrows + public void delete(String key) { + db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8)); } public void close() {