init commit

This commit is contained in:
Dmytro Shvaika 2019-11-19 12:55:50 +02:00 committed by Andrew Shvayka
parent 6af9aa0ecb
commit 33f703c2b8
13 changed files with 353 additions and 104 deletions

View File

@ -16,14 +16,11 @@
package org.thingsboard.server.dao.model.sql;
import lombok.Data;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import javax.persistence.Column;
import javax.persistence.Id;
@ -35,11 +32,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@Data
@MappedSuperclass
public abstract class AbsractTsKvEntity implements ToData<TsKvEntry> {
public abstract class AbsractTsKvEntity {
protected static final String SUM = "SUM";
protected static final String AVG = "AVG";
@ -50,10 +46,6 @@ public abstract class AbsractTsKvEntity implements ToData<TsKvEntry> {
@Column(name = ENTITY_ID_COLUMN)
protected String entityId;
@Id
@Column(name = TS_COLUMN)
protected Long ts;
@Id
@Column(name = KEY_COLUMN)
protected String key;
@ -70,8 +62,7 @@ public abstract class AbsractTsKvEntity implements ToData<TsKvEntry> {
@Column(name = DOUBLE_VALUE_COLUMN)
protected Double doubleValue;
@Override
public TsKvEntry toData() {
protected KvEntry getKvEntry() {
KvEntry kvEntry = null;
if (strValue != null) {
kvEntry = new StringDataEntry(key, strValue);
@ -82,7 +73,7 @@ public abstract class AbsractTsKvEntity implements ToData<TsKvEntry> {
} else if (booleanValue != null) {
kvEntry = new BooleanDataEntry(key, booleanValue);
}
return new BasicTsKvEntry(ts, kvEntry);
return kvEntry;
}
public abstract boolean isNotEmpty();

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.model.sqlts.timescale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
@ -35,6 +36,7 @@ import javax.persistence.SqlResultSetMappings;
import javax.persistence.Table;
import static org.thingsboard.server.dao.model.ModelConstants.TENANT_ID_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG;
import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG_QUERY;
import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_COUNT;
@ -119,6 +121,10 @@ public final class TimescaleTsKvEntity extends AbsractTsKvEntity implements ToDa
@Column(name = TENANT_ID_COLUMN)
private String tenantId;
@Id
@Column(name = TS_COLUMN)
protected Long ts;
public TimescaleTsKvEntity() { }
public TimescaleTsKvEntity(Long tsBucket, Long interval, Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String strValue, String aggType) {
@ -181,4 +187,9 @@ public final class TimescaleTsKvEntity extends AbsractTsKvEntity implements ToDa
public boolean isNotEmpty() {
return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null);
}
@Override
public TsKvEntry toData() {
return new BasicTsKvEntry(ts, getKvEntry());
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.model.sqlts.ts;
import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
@ -30,6 +31,7 @@ import javax.persistence.IdClass;
import javax.persistence.Table;
import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@Data
@Entity
@ -42,6 +44,10 @@ public final class TsKvEntity extends AbsractTsKvEntity implements ToData<TsKvEn
@Column(name = ENTITY_TYPE_COLUMN)
private EntityType entityType;
@Id
@Column(name = TS_COLUMN)
protected Long ts;
public TsKvEntity() {
}
@ -100,9 +106,13 @@ public final class TsKvEntity extends AbsractTsKvEntity implements ToData<TsKvEn
}
}
@Override
public boolean isNotEmpty() {
return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
}
@Override
public TsKvEntry toData() {
return new BasicTsKvEntry(ts, getKvEntry());
}
}

View File

@ -18,13 +18,9 @@ package org.thingsboard.server.dao.model.sqlts.ts;
import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ToData;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
import javax.persistence.Column;
import javax.persistence.Entity;
@ -34,63 +30,30 @@ import javax.persistence.Id;
import javax.persistence.IdClass;
import javax.persistence.Table;
import static org.thingsboard.server.dao.model.ModelConstants.BOOLEAN_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.DOUBLE_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN;
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@Data
@Entity
@Table(name = "ts_kv_latest")
@IdClass(TsKvLatestCompositeKey.class)
public final class TsKvLatestEntity implements ToData<TsKvEntry> {
public final class TsKvLatestEntity extends AbsractTsKvEntity implements ToData<TsKvEntry> {
//TODO: reafctor this and TsKvEntity to avoid code duplicates
@Id
@Enumerated(EnumType.STRING)
@Column(name = ENTITY_TYPE_COLUMN)
private EntityType entityType;
@Id
@Column(name = ENTITY_ID_COLUMN)
private String entityId;
@Id
@Column(name = KEY_COLUMN)
private String key;
@Column(name = TS_COLUMN)
private long ts;
@Column(name = BOOLEAN_VALUE_COLUMN)
private Boolean booleanValue;
@Column(name = STRING_VALUE_COLUMN)
private String strValue;
@Column(name = LONG_VALUE_COLUMN)
private Long longValue;
@Column(name = DOUBLE_VALUE_COLUMN)
private Double doubleValue;
@Override
public TsKvEntry toData() {
KvEntry kvEntry = null;
if (strValue != null) {
kvEntry = new StringDataEntry(key, strValue);
} else if (longValue != null) {
kvEntry = new LongDataEntry(key, longValue);
} else if (doubleValue != null) {
kvEntry = new DoubleDataEntry(key, doubleValue);
} else if (booleanValue != null) {
kvEntry = new BooleanDataEntry(key, booleanValue);
}
return new BasicTsKvEntry(ts, kvEntry);
return new BasicTsKvEntry(ts, getKvEntry());
}
@Override
public boolean isNotEmpty() {
return strValue != null || longValue != null || doubleValue != null || booleanValue != null;
}
}

View File

@ -0,0 +1,74 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
import org.springframework.stereotype.Repository;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@Repository
public abstract class AbstractInsertRepository {
protected static final String BOOL_V = "bool_v";
protected static final String STR_V = "str_v";
protected static final String LONG_V = "long_v";
protected static final String DBL_V = "dbl_v";
protected static final String TS_KV_LATEST_TABLE = "ts_kv_latest";
protected static final String TS_KV_TABLE = "ts_kv";
protected static final String HSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, BOOL_V);
protected static final String HSQL_ON_STR_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, STR_V);
protected static final String HSQL_ON_LONG_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, LONG_V);
protected static final String HSQL_ON_DBL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, DBL_V);
protected static final String HSQL_LATEST_ON_BOOL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, BOOL_V);
protected static final String HSQL_LATEST_ON_STR_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, STR_V);
protected static final String HSQL_LATEST_ON_LONG_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, LONG_V);
protected static final String HSQL_LATEST_ON_DBL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, DBL_V);
protected static final String PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null";
protected static final String PSQL_ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null";
protected static final String PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null";
protected static final String PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null";
@PersistenceContext
protected EntityManager entityManager;
protected static String getInsertOrUpdateStringHsql(String tableName, String constraint, String value, String nullValues) {
return "MERGE INTO " + tableName + " USING(VALUES :entity_type, :entity_id, :key, :ts, :" + value + ") A (entity_type, entity_id, key, ts, " + value + ") ON " + constraint + " WHEN MATCHED THEN UPDATE SET " + tableName + "." + value + " = A." + value + ", " + tableName + ".ts = A.ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, " + value + ") VALUES (A.entity_type, A.entity_id, A.key, A.ts, A." + value + ")";
}
protected static String getInsertOrUpdateStringPsql(String tableName, String constraint, String value, String nullValues) {
return "INSERT INTO " + tableName + " (entity_type, entity_id, key, ts, " + value + ") VALUES (:entity_type, :entity_id, :key, :ts, :" + value + ") ON CONFLICT " + constraint + " DO UPDATE SET " + value + " = :" + value + ", ts = :ts," + nullValues;
}
private static String getHsqlNullValues(String tableName, String notNullValue) {
switch (notNullValue) {
case BOOL_V:
return " " + tableName + ".str_v = null, " + tableName + ".long_v = null, " + tableName + ".dbl_v = null ";
case STR_V:
return " " + tableName + ".bool_v = null, " + tableName + ".long_v = null, " + tableName + ".dbl_v = null ";
case LONG_V:
return " " + tableName + ".str_v = null, " + tableName + ".bool_v = null, " + tableName + ".dbl_v = null ";
case DBL_V:
return " " + tableName + ".str_v = null, " + tableName + ".long_v = null, " + tableName + ".bool_v = null ";
default:
throw new RuntimeException("Unsupported insert value: [" + notNullValue + "]");
}
}
}

View File

@ -0,0 +1,54 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
@Repository
public abstract class AbstractLatestInsertRepository extends AbstractInsertRepository {
public abstract void saveOrUpdate(TsKvLatestEntity entity);
protected void processSaveOrUpdate(TsKvLatestEntity entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) {
if (entity.getBooleanValue() != null) {
saveOrUpdateBoolean(entity, requestBoolValue);
}
if (entity.getStrValue() != null) {
saveOrUpdateString(entity, requestStrValue);
}
if (entity.getLongValue() != null) {
saveOrUpdateLong(entity, requestLongValue);
}
if (entity.getDoubleValue() != null) {
saveOrUpdateDouble(entity, requestDblValue);
}
}
@Modifying
protected abstract void saveOrUpdateBoolean(TsKvLatestEntity entity, String query);
@Modifying
protected abstract void saveOrUpdateString(TsKvLatestEntity entity, String query);
@Modifying
protected abstract void saveOrUpdateLong(TsKvLatestEntity entity, String query);
@Modifying
protected abstract void saveOrUpdateDouble(TsKvLatestEntity entity, String query);
}

View File

@ -19,19 +19,8 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.stereotype.Repository;
import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@Repository
public abstract class AbstractTimeseriesInsertRepository<T extends AbsractTsKvEntity> {
protected static final String BOOL_V = "bool_v";
protected static final String STR_V = "str_v";
protected static final String LONG_V = "long_v";
protected static final String DBL_V = "dbl_v";
@PersistenceContext
protected EntityManager entityManager;
public abstract class AbstractTimeseriesInsertRepository<T extends AbsractTsKvEntity> extends AbstractInsertRepository {
public abstract void saveOrUpdate(T entity);

View File

@ -28,15 +28,10 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao;
@Transactional
public class TimescaleInsertRepository extends AbstractTimeseriesInsertRepository<TimescaleTsKvEntity> {
private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null";
private static final String ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null";
private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null";
private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null";
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
@Override
public void saveOrUpdate(TimescaleTsKvEntity entity) {

View File

@ -0,0 +1,86 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.ts;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository;
import org.thingsboard.server.dao.util.HsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
@SqlTsDao
@HsqlDao
@Repository
@Transactional
public class HsqlLatestInsertRepository extends AbstractLatestInsertRepository {
private static final String TS_KV_LATEST_CONSTRAINT = "(ts_kv_latest.entity_type=A.entity_type AND ts_kv_latest.entity_id=A.entity_id AND ts_kv_latest.key=A.key)";
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, BOOL_V, HSQL_LATEST_ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, STR_V, HSQL_LATEST_ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, LONG_V, HSQL_LATEST_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, DBL_V, HSQL_LATEST_ON_DBL_VALUE_UPDATE_SET_NULLS);
@Override
public void saveOrUpdate(TsKvLatestEntity entity) {
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
}
@Override
protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("bool_v", entity.getBooleanValue())
.executeUpdate();
}
@Override
protected void saveOrUpdateString(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("str_v", entity.getStrValue())
.executeUpdate();
}
@Override
protected void saveOrUpdateLong(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("long_v", entity.getLongValue())
.executeUpdate();
}
@Override
protected void saveOrUpdateDouble(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("dbl_v", entity.getDoubleValue())
.executeUpdate();
}
}

View File

@ -28,19 +28,12 @@ import org.thingsboard.server.dao.util.SqlTsDao;
@Transactional
public class HsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepository<TsKvEntity> {
private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = " ts_kv.str_v = null, ts_kv.long_v = null, ts_kv.dbl_v = null ";
private static final String ON_STR_VALUE_UPDATE_SET_NULLS = " ts_kv.bool_v = null, ts_kv.long_v = null, ts_kv.dbl_v = null ";
private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = " ts_kv.str_v = null, ts_kv.bool_v = null, ts_kv.dbl_v = null ";
private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = " ts_kv.str_v = null, ts_kv.long_v = null, ts_kv.bool_v = null ";
private static final String TS_KV_CONSTRAINT = "(ts_kv.entity_type=A.entity_type AND ts_kv.entity_id=A.entity_id AND ts_kv.key=A.key AND ts_kv.ts=A.ts)";
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS);
private static String getInsertOrUpdateString(String value, String nullValues) {
return "MERGE INTO ts_kv USING(VALUES :entity_type, :entity_id, :key, :ts, :" + value + ") A (entity_type, entity_id, key, ts, " + value + ") ON (ts_kv.entity_type=A.entity_type AND ts_kv.entity_id=A.entity_id AND ts_kv.key=A.key AND ts_kv.ts=A.ts) WHEN MATCHED THEN UPDATE SET ts_kv." + value + " = A." + value + ", ts_kv.ts = A.ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, " + value + ") VALUES (A.entity_type, A.entity_id, A.key, A.ts, A." + value + ")";
}
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, BOOL_V, HSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, STR_V, HSQL_ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, LONG_V , HSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, DBL_V, HSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
@Override
public void saveOrUpdate(TsKvEntity entity) {

View File

@ -38,6 +38,7 @@ import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestCompositeKey;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository;
import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao;
import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository;
import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
@ -69,6 +70,9 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese
@Autowired
private AbstractTimeseriesInsertRepository insertRepository;
@Autowired
private AbstractLatestInsertRepository insertLatestRepository;
@Override
public ListenableFuture<List<TsKvEntry>> findAllAsync(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries) {
return processFindAllAsync(tenantId, entityId, queries);
@ -285,7 +289,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese
latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null));
latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
return insertService.submit(() -> {
tsKvLatestRepository.save(latestEntity);
insertLatestRepository.saveOrUpdate(latestEntity);
return null;
});
}

View File

@ -0,0 +1,86 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sqlts.ts;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity;
import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository;
import org.thingsboard.server.dao.util.PsqlDao;
import org.thingsboard.server.dao.util.SqlTsDao;
@SqlTsDao
@PsqlDao
@Repository
@Transactional
public class PsqlLatestInsertRepository extends AbstractLatestInsertRepository {
private static final String TS_KV_LATEST_CONSTRAINT = "(entity_type, entity_id, key)";
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
@Override
public void saveOrUpdate(TsKvLatestEntity entity) {
processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT);
}
@Override
protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("bool_v", entity.getBooleanValue())
.executeUpdate();
}
@Override
protected void saveOrUpdateString(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("str_v", entity.getStrValue())
.executeUpdate();
}
@Override
protected void saveOrUpdateLong(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("long_v", entity.getLongValue())
.executeUpdate();
}
@Override
protected void saveOrUpdateDouble(TsKvLatestEntity entity, String query) {
entityManager.createNativeQuery(query)
.setParameter("entity_type", entity.getEntityType().name())
.setParameter("entity_id", entity.getEntityId())
.setParameter("key", entity.getKey())
.setParameter("ts", entity.getTs())
.setParameter("dbl_v", entity.getDoubleValue())
.executeUpdate();
}
}

View File

@ -28,19 +28,12 @@ import org.thingsboard.server.dao.util.SqlTsDao;
@Transactional
public class PsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepository<TsKvEntity> {
private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null";
private static final String ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null";
private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null";
private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null";
private static final String TS_KV_CONSTRAINT = "(entity_type, entity_id, key, ts)";
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS);
private static String getInsertOrUpdateString(String value, String nullValues) {
return "INSERT INTO ts_kv (entity_type, entity_id, key, ts, " + value + ") VALUES (:entity_type, :entity_id, :key, :ts, :" + value + ") ON CONFLICT (entity_type, entity_id, key, ts) DO UPDATE SET " + value + " = :" + value + ", ts = :ts," + nullValues;
}
private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS);
private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS);
@Override
public void saveOrUpdate(TsKvEntity entity) {