ranamed seq_number to version and refactoring
This commit is contained in:
parent
b666f9499a
commit
4807a7cbdd
@ -20,7 +20,6 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import io.swagger.v3.oas.annotations.Parameter;
|
import io.swagger.v3.oas.annotations.Parameter;
|
||||||
import io.swagger.v3.oas.annotations.media.ArraySchema;
|
|
||||||
import io.swagger.v3.oas.annotations.media.Content;
|
import io.swagger.v3.oas.annotations.media.Content;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||||
@ -31,7 +30,6 @@ import org.springframework.http.HttpStatus;
|
|||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.security.access.prepost.PreAuthorize;
|
import org.springframework.security.access.prepost.PreAuthorize;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
import org.springframework.web.bind.annotation.PostMapping;
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
@ -490,7 +488,7 @@ public class DeviceController extends BaseController {
|
|||||||
@RequestMapping(value = "/devices", params = {"deviceIds"}, method = RequestMethod.GET)
|
@RequestMapping(value = "/devices", params = {"deviceIds"}, method = RequestMethod.GET)
|
||||||
@ResponseBody
|
@ResponseBody
|
||||||
public List<Device> getDevicesByIds(
|
public List<Device> getDevicesByIds(
|
||||||
@Parameter(description = "A list of devices ids, separated by comma ','", array = @ArraySchema(schema = @Schema(type = "string")))
|
@Parameter(description = "A list of devices ids, separated by comma ','")
|
||||||
@RequestParam("deviceIds") String[] strDeviceIds) throws ThingsboardException, ExecutionException, InterruptedException {
|
@RequestParam("deviceIds") String[] strDeviceIds) throws ThingsboardException, ExecutionException, InterruptedException {
|
||||||
checkArrayParameter("deviceIds", strDeviceIds);
|
checkArrayParameter("deviceIds", strDeviceIds);
|
||||||
SecurityUser user = getCurrentUser();
|
SecurityUser user = getCurrentUser();
|
||||||
|
|||||||
@ -29,9 +29,9 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public abstract class AbstractSequenceInsertRepository<T> extends AbstractInsertRepository {
|
public abstract class AbstractVersionedInsertRepository<T> extends AbstractInsertRepository {
|
||||||
|
|
||||||
public static final String SEQ_NUMBER = "seq_number";
|
public static final String VERSION_COLUMN = "version";
|
||||||
|
|
||||||
public List<Long> saveOrUpdate(List<T> entities) {
|
public List<Long> saveOrUpdate(List<T> entities) {
|
||||||
return transactionTemplate.execute(status -> {
|
return transactionTemplate.execute(status -> {
|
||||||
@ -54,7 +54,7 @@ public abstract class AbstractSequenceInsertRepository<T> extends AbstractInsert
|
|||||||
seqNumbers.add(0L);
|
seqNumbers.add(0L);
|
||||||
toInsertIndexes.add(i);
|
toInsertIndexes.add(i);
|
||||||
} else {
|
} else {
|
||||||
seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(SEQ_NUMBER));
|
seqNumbers.add((Long) seqNumbersList.get(keyHolderIndex).get(VERSION_COLUMN));
|
||||||
keyHolderIndex++;
|
keyHolderIndex++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -68,7 +68,7 @@ public abstract class AbstractSequenceInsertRepository<T> extends AbstractInsert
|
|||||||
seqNumbersList = keyHolder.getKeyList();
|
seqNumbersList = keyHolder.getKeyList();
|
||||||
|
|
||||||
for (int i = 0; i < seqNumbersList.size(); i++) {
|
for (int i = 0; i < seqNumbersList.size(); i++) {
|
||||||
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(SEQ_NUMBER));
|
seqNumbers.set(toInsertIndexes.get(i), (Long) seqNumbersList.get(i).get(VERSION_COLUMN));
|
||||||
}
|
}
|
||||||
|
|
||||||
return seqNumbers;
|
return seqNumbers;
|
||||||
@ -113,7 +113,7 @@ public abstract class AbstractSequenceInsertRepository<T> extends AbstractInsert
|
|||||||
|
|
||||||
private record SequencePreparedStatementCreator(String sql) implements PreparedStatementCreator, SqlProvider {
|
private record SequencePreparedStatementCreator(String sql) implements PreparedStatementCreator, SqlProvider {
|
||||||
|
|
||||||
private static final String[] COLUMNS = {SEQ_NUMBER};
|
private static final String[] COLUMNS = {VERSION_COLUMN};
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
|
public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
|
||||||
@ -17,7 +17,7 @@ package org.thingsboard.server.dao.sql.attributes;
|
|||||||
|
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.thingsboard.server.dao.AbstractSequenceInsertRepository;
|
import org.thingsboard.server.dao.AbstractVersionedInsertRepository;
|
||||||
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
|
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
|
||||||
import org.thingsboard.server.dao.util.SqlDao;
|
import org.thingsboard.server.dao.util.SqlDao;
|
||||||
|
|
||||||
@ -29,16 +29,16 @@ import java.util.List;
|
|||||||
@Repository
|
@Repository
|
||||||
@Transactional
|
@Transactional
|
||||||
@SqlDao
|
@SqlDao
|
||||||
public class AttributeKvInsertRepository extends AbstractSequenceInsertRepository<AttributeKvEntity> {
|
public class AttributeKvInsertRepository extends AbstractVersionedInsertRepository<AttributeKvEntity> {
|
||||||
|
|
||||||
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, seq_number = nextval('attribute_kv_latest_seq') " +
|
private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') " +
|
||||||
"WHERE entity_id = ? and attribute_type =? and attribute_key = ? RETURNING seq_number;";
|
"WHERE entity_id = ? and attribute_type =? and attribute_key = ? RETURNING version;";
|
||||||
|
|
||||||
private static final String INSERT_OR_UPDATE =
|
private static final String INSERT_OR_UPDATE =
|
||||||
"INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts, seq_number) " +
|
"INSERT INTO attribute_kv (entity_id, attribute_type, attribute_key, str_v, long_v, dbl_v, bool_v, json_v, last_update_ts, version) " +
|
||||||
"VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, nextval('attribute_kv_latest_seq')) " +
|
"VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, nextval('attribute_kv_version_seq')) " +
|
||||||
"ON CONFLICT (entity_id, attribute_type, attribute_key) " +
|
"ON CONFLICT (entity_id, attribute_type, attribute_key) " +
|
||||||
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, seq_number = nextval('attribute_kv_latest_seq') RETURNING seq_number;";
|
"DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') RETURNING version;";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List<AttributeKvEntity> entities) throws SQLException {
|
protected void setOnBatchUpdateValues(PreparedStatement ps, int i, List<AttributeKvEntity> entities) throws SQLException {
|
||||||
|
|||||||
@ -19,7 +19,7 @@ import jakarta.annotation.PostConstruct;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.thingsboard.server.dao.AbstractSequenceInsertRepository;
|
import org.thingsboard.server.dao.AbstractVersionedInsertRepository;
|
||||||
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
|
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
|
||||||
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
||||||
import org.thingsboard.server.dao.util.SqlDao;
|
import org.thingsboard.server.dao.util.SqlDao;
|
||||||
@ -35,23 +35,23 @@ import java.util.List;
|
|||||||
@Repository
|
@Repository
|
||||||
@Transactional
|
@Transactional
|
||||||
@SqlDao
|
@SqlDao
|
||||||
public class SqlLatestInsertTsRepository extends AbstractSequenceInsertRepository<TsKvLatestEntity> implements InsertLatestTsRepository {
|
public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository<TsKvLatestEntity> implements InsertLatestTsRepository {
|
||||||
|
|
||||||
@Value("${sql.ts_latest.update_by_latest_ts:true}")
|
@Value("${sql.ts_latest.update_by_latest_ts:true}")
|
||||||
private Boolean updateByLatestTs;
|
private Boolean updateByLatestTs;
|
||||||
|
|
||||||
private static final String BATCH_UPDATE =
|
private static final String BATCH_UPDATE =
|
||||||
"UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), seq_number = nextval('ts_kv_latest_seq') WHERE entity_id = ? AND key = ?";
|
"UPDATE ts_kv_latest SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = nextval('ts_kv_latest_version_seq') WHERE entity_id = ? AND key = ?";
|
||||||
|
|
||||||
private static final String INSERT_OR_UPDATE =
|
private static final String INSERT_OR_UPDATE =
|
||||||
"INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, seq_number) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), nextval('ts_kv_latest_seq')) " +
|
"INSERT INTO ts_kv_latest (entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, version) VALUES(?, ?, ?, ?, ?, ?, ?, cast(? AS json), nextval('ts_kv_latest_version_seq')) " +
|
||||||
"ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), seq_number = nextval('ts_kv_latest_seq')";
|
"ON CONFLICT (entity_id, key) DO UPDATE SET ts = ?, bool_v = ?, str_v = ?, long_v = ?, dbl_v = ?, json_v = cast(? AS json), version = nextval('ts_kv_latest_version_seq')";
|
||||||
|
|
||||||
private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?";
|
private static final String BATCH_UPDATE_BY_LATEST_TS = BATCH_UPDATE + " AND ts_kv_latest.ts <= ?";
|
||||||
|
|
||||||
private static final String INSERT_OR_UPDATE_BY_LATEST_TS = INSERT_OR_UPDATE + " WHERE ts_kv_latest.ts <= ?";
|
private static final String INSERT_OR_UPDATE_BY_LATEST_TS = INSERT_OR_UPDATE + " WHERE ts_kv_latest.ts <= ?";
|
||||||
|
|
||||||
private static final String RETURNING = " RETURNING seq_number";
|
private static final String RETURNING = " RETURNING version";
|
||||||
|
|
||||||
private String batchUpdateQuery;
|
private String batchUpdateQuery;
|
||||||
private String insertOrUpdateQuery;
|
private String insertOrUpdateQuery;
|
||||||
|
|||||||
@ -102,7 +102,7 @@ CREATE TABLE IF NOT EXISTS audit_log (
|
|||||||
action_failure_details varchar(1000000)
|
action_failure_details varchar(1000000)
|
||||||
) PARTITION BY RANGE (created_time);
|
) PARTITION BY RANGE (created_time);
|
||||||
|
|
||||||
CREATE SEQUENCE IF NOT EXISTS attribute_kv_latest_seq cache 1000;
|
CREATE SEQUENCE IF NOT EXISTS attribute_kv_version_seq cache 1000;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS attribute_kv (
|
CREATE TABLE IF NOT EXISTS attribute_kv (
|
||||||
entity_id uuid,
|
entity_id uuid,
|
||||||
@ -114,7 +114,7 @@ CREATE TABLE IF NOT EXISTS attribute_kv (
|
|||||||
dbl_v double precision,
|
dbl_v double precision,
|
||||||
json_v json,
|
json_v json,
|
||||||
last_update_ts bigint,
|
last_update_ts bigint,
|
||||||
seq_number bigint,
|
version bigint,
|
||||||
CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key)
|
CONSTRAINT attribute_kv_pkey PRIMARY KEY (entity_id, attribute_type, attribute_key)
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -541,7 +541,7 @@ CREATE TABLE IF NOT EXISTS entity_view (
|
|||||||
CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id)
|
CONSTRAINT entity_view_external_id_unq_key UNIQUE (tenant_id, external_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_seq cache 1000;
|
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS ts_kv_latest
|
CREATE TABLE IF NOT EXISTS ts_kv_latest
|
||||||
(
|
(
|
||||||
@ -553,7 +553,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
|
|||||||
long_v bigint,
|
long_v bigint,
|
||||||
dbl_v double precision,
|
dbl_v double precision,
|
||||||
json_v json,
|
json_v json,
|
||||||
seq_number bigint,
|
version bigint,
|
||||||
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -34,6 +34,8 @@ CREATE TABLE IF NOT EXISTS key_dictionary (
|
|||||||
CONSTRAINT key_dictionary_id_pkey PRIMARY KEY (key)
|
CONSTRAINT key_dictionary_id_pkey PRIMARY KEY (key)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS ts_kv_latest (
|
CREATE TABLE IF NOT EXISTS ts_kv_latest (
|
||||||
entity_id uuid NOT NULL,
|
entity_id uuid NOT NULL,
|
||||||
key int NOT NULL,
|
key int NOT NULL,
|
||||||
@ -43,6 +45,7 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest (
|
|||||||
long_v bigint,
|
long_v bigint,
|
||||||
dbl_v double precision,
|
dbl_v double precision,
|
||||||
json_v json,
|
json_v json,
|
||||||
|
version bigint,
|
||||||
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -14,6 +14,8 @@
|
|||||||
-- limitations under the License.
|
-- limitations under the License.
|
||||||
--
|
--
|
||||||
|
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS ts_kv_latest_version_seq cache 1000;
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS ts_kv_latest
|
CREATE TABLE IF NOT EXISTS ts_kv_latest
|
||||||
(
|
(
|
||||||
entity_id uuid NOT NULL,
|
entity_id uuid NOT NULL,
|
||||||
@ -24,12 +26,6 @@ CREATE TABLE IF NOT EXISTS ts_kv_latest
|
|||||||
long_v bigint,
|
long_v bigint,
|
||||||
dbl_v double precision,
|
dbl_v double precision,
|
||||||
json_v json,
|
json_v json,
|
||||||
|
version bigint,
|
||||||
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS ts_kv_dictionary
|
|
||||||
(
|
|
||||||
key varchar(255) NOT NULL,
|
|
||||||
key_id serial UNIQUE,
|
|
||||||
CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
|
|
||||||
);
|
|
||||||
@ -23,6 +23,7 @@ DROP TABLE IF EXISTS alarm_type;
|
|||||||
DROP TABLE IF EXISTS asset;
|
DROP TABLE IF EXISTS asset;
|
||||||
DROP TABLE IF EXISTS audit_log;
|
DROP TABLE IF EXISTS audit_log;
|
||||||
DROP TABLE IF EXISTS attribute_kv;
|
DROP TABLE IF EXISTS attribute_kv;
|
||||||
|
DROP SEQUENCE IF EXISTS attribute_kv_version_seq;
|
||||||
DROP TABLE IF EXISTS component_descriptor;
|
DROP TABLE IF EXISTS component_descriptor;
|
||||||
DROP TABLE IF EXISTS customer;
|
DROP TABLE IF EXISTS customer;
|
||||||
DROP TABLE IF EXISTS device;
|
DROP TABLE IF EXISTS device;
|
||||||
@ -37,7 +38,7 @@ DROP TABLE IF EXISTS tenant;
|
|||||||
DROP TABLE IF EXISTS ts_kv;
|
DROP TABLE IF EXISTS ts_kv;
|
||||||
DROP TABLE IF EXISTS ts_kv_latest;
|
DROP TABLE IF EXISTS ts_kv_latest;
|
||||||
DROP TABLE IF EXISTS ts_kv_dictionary;
|
DROP TABLE IF EXISTS ts_kv_dictionary;
|
||||||
DROP SEQUENCE IF EXISTS ts_kv_latest_seq;
|
DROP SEQUENCE IF EXISTS ts_kv_latest_version_seq;
|
||||||
DROP TABLE IF EXISTS user_credentials;
|
DROP TABLE IF EXISTS user_credentials;
|
||||||
DROP TABLE IF EXISTS widgets_bundle_widget;
|
DROP TABLE IF EXISTS widgets_bundle_widget;
|
||||||
DROP TABLE IF EXISTS widget_type;
|
DROP TABLE IF EXISTS widget_type;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user