diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java index 2773e5e4ab..d8c0e4ef0a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/AbsractTsKvEntity.java @@ -16,14 +16,11 @@ package org.thingsboard.server.dao.model.sql; import lombok.Data; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; -import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.dao.model.ToData; import javax.persistence.Column; import javax.persistence.Id; @@ -35,11 +32,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_ID_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.KEY_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.LONG_VALUE_COLUMN; import static org.thingsboard.server.dao.model.ModelConstants.STRING_VALUE_COLUMN; -import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; @Data @MappedSuperclass -public abstract class AbsractTsKvEntity implements ToData { +public abstract class AbsractTsKvEntity { protected static final String SUM = "SUM"; protected static final String AVG = "AVG"; @@ -50,10 +46,6 @@ public abstract class AbsractTsKvEntity implements ToData { @Column(name = ENTITY_ID_COLUMN) protected String entityId; - @Id - @Column(name = TS_COLUMN) - protected Long ts; - @Id @Column(name = KEY_COLUMN) protected String key; @@ -70,8 +62,7 @@ public abstract class AbsractTsKvEntity implements ToData { @Column(name = DOUBLE_VALUE_COLUMN) protected Double doubleValue; - @Override - public TsKvEntry toData() { + protected KvEntry getKvEntry() { KvEntry kvEntry = null; if (strValue != null) { kvEntry = new StringDataEntry(key, strValue); @@ -82,7 +73,7 @@ public abstract class AbsractTsKvEntity implements ToData { } else if (booleanValue != null) { kvEntry = new BooleanDataEntry(key, booleanValue); } - return new BasicTsKvEntry(ts, kvEntry); + return kvEntry; } public abstract boolean isNotEmpty(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java index fa212dd2f9..3427c928f4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/timescale/TimescaleTsKvEntity.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.model.sqlts.timescale; import lombok.Data; import lombok.EqualsAndHashCode; import org.springframework.util.StringUtils; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ToData; import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; @@ -35,6 +36,7 @@ import javax.persistence.SqlResultSetMappings; import javax.persistence.Table; import static org.thingsboard.server.dao.model.ModelConstants.TENANT_ID_COLUMN; +import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG; import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_AVG_QUERY; import static org.thingsboard.server.dao.sqlts.timescale.AggregationRepository.FIND_COUNT; @@ -119,6 +121,10 @@ public final class TimescaleTsKvEntity extends AbsractTsKvEntity implements ToDa @Column(name = TENANT_ID_COLUMN) private String tenantId; + @Id + @Column(name = TS_COLUMN) + protected Long ts; + public TimescaleTsKvEntity() { } public TimescaleTsKvEntity(Long tsBucket, Long interval, Long longValue, Double doubleValue, Long longCountValue, Long doubleCountValue, String strValue, String aggType) { @@ -181,4 +187,9 @@ public final class TimescaleTsKvEntity extends AbsractTsKvEntity implements ToDa public boolean isNotEmpty() { return ts != null && (strValue != null || longValue != null || doubleValue != null || booleanValue != null); } + + @Override + public TsKvEntry toData() { + return new BasicTsKvEntry(ts, getKvEntry()); + } } \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java index 4440a3dd0c..c5b9237f13 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/ts/TsKvEntity.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.model.sqlts.ts; import lombok.Data; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ToData; import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; @@ -30,6 +31,7 @@ import javax.persistence.IdClass; import javax.persistence.Table; import static org.thingsboard.server.dao.model.ModelConstants.ENTITY_TYPE_COLUMN; +import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; @Data @Entity @@ -42,6 +44,10 @@ public final class TsKvEntity extends AbsractTsKvEntity implements ToData { +public final class TsKvLatestEntity extends AbsractTsKvEntity implements ToData { - - //TODO: reafctor this and TsKvEntity to avoid code duplicates @Id @Enumerated(EnumType.STRING) @Column(name = ENTITY_TYPE_COLUMN) private EntityType entityType; - @Id - @Column(name = ENTITY_ID_COLUMN) - private String entityId; - - @Id - @Column(name = KEY_COLUMN) - private String key; - @Column(name = TS_COLUMN) private long ts; - @Column(name = BOOLEAN_VALUE_COLUMN) - private Boolean booleanValue; - - @Column(name = STRING_VALUE_COLUMN) - private String strValue; - - @Column(name = LONG_VALUE_COLUMN) - private Long longValue; - - @Column(name = DOUBLE_VALUE_COLUMN) - private Double doubleValue; - @Override public TsKvEntry toData() { - KvEntry kvEntry = null; - if (strValue != null) { - kvEntry = new StringDataEntry(key, strValue); - } else if (longValue != null) { - kvEntry = new LongDataEntry(key, longValue); - } else if (doubleValue != null) { - kvEntry = new DoubleDataEntry(key, doubleValue); - } else if (booleanValue != null) { - kvEntry = new BooleanDataEntry(key, booleanValue); - } - return new BasicTsKvEntry(ts, kvEntry); + return new BasicTsKvEntry(ts, getKvEntry()); + } + + @Override + public boolean isNotEmpty() { + return strValue != null || longValue != null || doubleValue != null || booleanValue != null; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java new file mode 100644 index 0000000000..bf647ada38 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractInsertRepository.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts; + +import org.springframework.stereotype.Repository; + +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; + +@Repository +public abstract class AbstractInsertRepository { + + protected static final String BOOL_V = "bool_v"; + protected static final String STR_V = "str_v"; + protected static final String LONG_V = "long_v"; + protected static final String DBL_V = "dbl_v"; + + protected static final String TS_KV_LATEST_TABLE = "ts_kv_latest"; + protected static final String TS_KV_TABLE = "ts_kv"; + + protected static final String HSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, BOOL_V); + protected static final String HSQL_ON_STR_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, STR_V); + protected static final String HSQL_ON_LONG_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, LONG_V); + protected static final String HSQL_ON_DBL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_TABLE, DBL_V); + + protected static final String HSQL_LATEST_ON_BOOL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, BOOL_V); + protected static final String HSQL_LATEST_ON_STR_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, STR_V); + protected static final String HSQL_LATEST_ON_LONG_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, LONG_V); + protected static final String HSQL_LATEST_ON_DBL_VALUE_UPDATE_SET_NULLS = getHsqlNullValues(TS_KV_LATEST_TABLE, DBL_V); + + protected static final String PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null"; + protected static final String PSQL_ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null"; + protected static final String PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null"; + protected static final String PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null"; + + @PersistenceContext + protected EntityManager entityManager; + + protected static String getInsertOrUpdateStringHsql(String tableName, String constraint, String value, String nullValues) { + return "MERGE INTO " + tableName + " USING(VALUES :entity_type, :entity_id, :key, :ts, :" + value + ") A (entity_type, entity_id, key, ts, " + value + ") ON " + constraint + " WHEN MATCHED THEN UPDATE SET " + tableName + "." + value + " = A." + value + ", " + tableName + ".ts = A.ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, " + value + ") VALUES (A.entity_type, A.entity_id, A.key, A.ts, A." + value + ")"; + } + + protected static String getInsertOrUpdateStringPsql(String tableName, String constraint, String value, String nullValues) { + return "INSERT INTO " + tableName + " (entity_type, entity_id, key, ts, " + value + ") VALUES (:entity_type, :entity_id, :key, :ts, :" + value + ") ON CONFLICT " + constraint + " DO UPDATE SET " + value + " = :" + value + ", ts = :ts," + nullValues; + } + + private static String getHsqlNullValues(String tableName, String notNullValue) { + switch (notNullValue) { + case BOOL_V: + return " " + tableName + ".str_v = null, " + tableName + ".long_v = null, " + tableName + ".dbl_v = null "; + case STR_V: + return " " + tableName + ".bool_v = null, " + tableName + ".long_v = null, " + tableName + ".dbl_v = null "; + case LONG_V: + return " " + tableName + ".str_v = null, " + tableName + ".bool_v = null, " + tableName + ".dbl_v = null "; + case DBL_V: + return " " + tableName + ".str_v = null, " + tableName + ".long_v = null, " + tableName + ".bool_v = null "; + default: + throw new RuntimeException("Unsupported insert value: [" + notNullValue + "]"); + } + } +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java new file mode 100644 index 0000000000..a31b0e395b --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractLatestInsertRepository.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts; + +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.stereotype.Repository; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; + +@Repository +public abstract class AbstractLatestInsertRepository extends AbstractInsertRepository { + + public abstract void saveOrUpdate(TsKvLatestEntity entity); + + protected void processSaveOrUpdate(TsKvLatestEntity entity, String requestBoolValue, String requestStrValue, String requestLongValue, String requestDblValue) { + if (entity.getBooleanValue() != null) { + saveOrUpdateBoolean(entity, requestBoolValue); + } + if (entity.getStrValue() != null) { + saveOrUpdateString(entity, requestStrValue); + } + if (entity.getLongValue() != null) { + saveOrUpdateLong(entity, requestLongValue); + } + if (entity.getDoubleValue() != null) { + saveOrUpdateDouble(entity, requestDblValue); + } + } + + @Modifying + protected abstract void saveOrUpdateBoolean(TsKvLatestEntity entity, String query); + + @Modifying + protected abstract void saveOrUpdateString(TsKvLatestEntity entity, String query); + + @Modifying + protected abstract void saveOrUpdateLong(TsKvLatestEntity entity, String query); + + @Modifying + protected abstract void saveOrUpdateDouble(TsKvLatestEntity entity, String query); + +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java index 608933c3f7..6f1b9b1ed3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractTimeseriesInsertRepository.java @@ -19,19 +19,8 @@ import org.springframework.data.jpa.repository.Modifying; import org.springframework.stereotype.Repository; import org.thingsboard.server.dao.model.sql.AbsractTsKvEntity; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; - @Repository -public abstract class AbstractTimeseriesInsertRepository { - - protected static final String BOOL_V = "bool_v"; - protected static final String STR_V = "str_v"; - protected static final String LONG_V = "long_v"; - protected static final String DBL_V = "dbl_v"; - - @PersistenceContext - protected EntityManager entityManager; +public abstract class AbstractTimeseriesInsertRepository extends AbstractInsertRepository { public abstract void saveOrUpdate(T entity); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java index 3c87e9f909..d4cbd1c994 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleInsertRepository.java @@ -28,15 +28,10 @@ import org.thingsboard.server.dao.util.TimescaleDBTsDao; @Transactional public class TimescaleInsertRepository extends AbstractTimeseriesInsertRepository { - private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null"; - private static final String ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null"; - private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null"; - private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null"; - - private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); @Override public void saveOrUpdate(TimescaleTsKvEntity entity) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java new file mode 100644 index 0000000000..cea88a5266 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlLatestInsertRepository.java @@ -0,0 +1,86 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts.ts; + +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; +import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository; +import org.thingsboard.server.dao.util.HsqlDao; +import org.thingsboard.server.dao.util.SqlTsDao; + +@SqlTsDao +@HsqlDao +@Repository +@Transactional +public class HsqlLatestInsertRepository extends AbstractLatestInsertRepository { + + private static final String TS_KV_LATEST_CONSTRAINT = "(ts_kv_latest.entity_type=A.entity_type AND ts_kv_latest.entity_id=A.entity_id AND ts_kv_latest.key=A.key)"; + + private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, BOOL_V, HSQL_LATEST_ON_BOOL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, STR_V, HSQL_LATEST_ON_STR_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, LONG_V, HSQL_LATEST_ON_LONG_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, DBL_V, HSQL_LATEST_ON_DBL_VALUE_UPDATE_SET_NULLS); + + @Override + public void saveOrUpdate(TsKvLatestEntity entity) { + processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); + } + + @Override + protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("bool_v", entity.getBooleanValue()) + .executeUpdate(); + } + + @Override + protected void saveOrUpdateString(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("str_v", entity.getStrValue()) + .executeUpdate(); + } + + @Override + protected void saveOrUpdateLong(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("long_v", entity.getLongValue()) + .executeUpdate(); + } + + @Override + protected void saveOrUpdateDouble(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("dbl_v", entity.getDoubleValue()) + .executeUpdate(); + } +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java index 3ac5cc67a9..927bcd2443 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/HsqlTimeseriesInsertRepository.java @@ -28,19 +28,12 @@ import org.thingsboard.server.dao.util.SqlTsDao; @Transactional public class HsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepository { - private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = " ts_kv.str_v = null, ts_kv.long_v = null, ts_kv.dbl_v = null "; - private static final String ON_STR_VALUE_UPDATE_SET_NULLS = " ts_kv.bool_v = null, ts_kv.long_v = null, ts_kv.dbl_v = null "; - private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = " ts_kv.str_v = null, ts_kv.bool_v = null, ts_kv.dbl_v = null "; - private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = " ts_kv.str_v = null, ts_kv.long_v = null, ts_kv.bool_v = null "; + private static final String TS_KV_CONSTRAINT = "(ts_kv.entity_type=A.entity_type AND ts_kv.entity_id=A.entity_id AND ts_kv.key=A.key AND ts_kv.ts=A.ts)"; - private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); - - private static String getInsertOrUpdateString(String value, String nullValues) { - return "MERGE INTO ts_kv USING(VALUES :entity_type, :entity_id, :key, :ts, :" + value + ") A (entity_type, entity_id, key, ts, " + value + ") ON (ts_kv.entity_type=A.entity_type AND ts_kv.entity_id=A.entity_id AND ts_kv.key=A.key AND ts_kv.ts=A.ts) WHEN MATCHED THEN UPDATE SET ts_kv." + value + " = A." + value + ", ts_kv.ts = A.ts," + nullValues + "WHEN NOT MATCHED THEN INSERT (entity_type, entity_id, key, ts, " + value + ") VALUES (A.entity_type, A.entity_id, A.key, A.ts, A." + value + ")"; - } + private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, BOOL_V, HSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, STR_V, HSQL_ON_STR_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, LONG_V , HSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringHsql(TS_KV_TABLE, TS_KV_CONSTRAINT, DBL_V, HSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); @Override public void saveOrUpdate(TsKvEntity entity) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java index 9e4e281ebd..b70b59604f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/JpaTimeseriesDao.java @@ -38,6 +38,7 @@ import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity; import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestCompositeKey; import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; +import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository; import org.thingsboard.server.dao.sqlts.AbstractSqlTimeseriesDao; import org.thingsboard.server.dao.sqlts.AbstractTimeseriesInsertRepository; import org.thingsboard.server.dao.timeseries.SimpleListenableFuture; @@ -69,6 +70,9 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese @Autowired private AbstractTimeseriesInsertRepository insertRepository; + @Autowired + private AbstractLatestInsertRepository insertLatestRepository; + @Override public ListenableFuture> findAllAsync(TenantId tenantId, EntityId entityId, List queries) { return processFindAllAsync(tenantId, entityId, queries); @@ -285,7 +289,7 @@ public class JpaTimeseriesDao extends AbstractSqlTimeseriesDao implements Timese latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null)); latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null)); return insertService.submit(() -> { - tsKvLatestRepository.save(latestEntity); + insertLatestRepository.saveOrUpdate(latestEntity); return null; }); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java new file mode 100644 index 0000000000..c61a74a15d --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlLatestInsertRepository.java @@ -0,0 +1,86 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sqlts.ts; + +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; +import org.thingsboard.server.dao.model.sqlts.ts.TsKvLatestEntity; +import org.thingsboard.server.dao.sqlts.AbstractLatestInsertRepository; +import org.thingsboard.server.dao.util.PsqlDao; +import org.thingsboard.server.dao.util.SqlTsDao; + +@SqlTsDao +@PsqlDao +@Repository +@Transactional +public class PsqlLatestInsertRepository extends AbstractLatestInsertRepository { + + private static final String TS_KV_LATEST_CONSTRAINT = "(entity_type, entity_id, key)"; + + private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_LATEST_TABLE, TS_KV_LATEST_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); + + @Override + public void saveOrUpdate(TsKvLatestEntity entity) { + processSaveOrUpdate(entity, INSERT_OR_UPDATE_BOOL_STATEMENT, INSERT_OR_UPDATE_STR_STATEMENT, INSERT_OR_UPDATE_LONG_STATEMENT, INSERT_OR_UPDATE_DBL_STATEMENT); + } + + @Override + protected void saveOrUpdateBoolean(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("bool_v", entity.getBooleanValue()) + .executeUpdate(); + } + + @Override + protected void saveOrUpdateString(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("str_v", entity.getStrValue()) + .executeUpdate(); + } + + @Override + protected void saveOrUpdateLong(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("long_v", entity.getLongValue()) + .executeUpdate(); + } + + @Override + protected void saveOrUpdateDouble(TsKvLatestEntity entity, String query) { + entityManager.createNativeQuery(query) + .setParameter("entity_type", entity.getEntityType().name()) + .setParameter("entity_id", entity.getEntityId()) + .setParameter("key", entity.getKey()) + .setParameter("ts", entity.getTs()) + .setParameter("dbl_v", entity.getDoubleValue()) + .executeUpdate(); + } +} \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java index 4ed91c28f2..6390a7faee 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/ts/PsqlTimeseriesInsertRepository.java @@ -28,19 +28,12 @@ import org.thingsboard.server.dao.util.SqlTsDao; @Transactional public class PsqlTimeseriesInsertRepository extends AbstractTimeseriesInsertRepository { - private static final String ON_BOOL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, dbl_v = null"; - private static final String ON_STR_VALUE_UPDATE_SET_NULLS = "bool_v = null, long_v = null, dbl_v = null"; - private static final String ON_LONG_VALUE_UPDATE_SET_NULLS = "str_v = null, bool_v = null, dbl_v = null"; - private static final String ON_DBL_VALUE_UPDATE_SET_NULLS = "str_v = null, long_v = null, bool_v = null"; + private static final String TS_KV_CONSTRAINT = "(entity_type, entity_id, key, ts)"; - private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateString(BOOL_V, ON_BOOL_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateString(STR_V, ON_STR_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateString(LONG_V , ON_LONG_VALUE_UPDATE_SET_NULLS); - private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateString(DBL_V, ON_DBL_VALUE_UPDATE_SET_NULLS); - - private static String getInsertOrUpdateString(String value, String nullValues) { - return "INSERT INTO ts_kv (entity_type, entity_id, key, ts, " + value + ") VALUES (:entity_type, :entity_id, :key, :ts, :" + value + ") ON CONFLICT (entity_type, entity_id, key, ts) DO UPDATE SET " + value + " = :" + value + ", ts = :ts," + nullValues; - } + private static final String INSERT_OR_UPDATE_BOOL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, BOOL_V, PSQL_ON_BOOL_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_STR_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, STR_V, PSQL_ON_STR_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_LONG_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, LONG_V, PSQL_ON_LONG_VALUE_UPDATE_SET_NULLS); + private static final String INSERT_OR_UPDATE_DBL_STATEMENT = getInsertOrUpdateStringPsql(TS_KV_TABLE, TS_KV_CONSTRAINT, DBL_V, PSQL_ON_DBL_VALUE_UPDATE_SET_NULLS); @Override public void saveOrUpdate(TsKvEntity entity) {