Added ts kv latest and ts kv jps dao support
This commit is contained in:
parent
4486d07f6d
commit
d6d2de6d9d
@ -15,8 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.data.kv;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface TsKvQuery {
|
||||
|
||||
String getKey();
|
||||
|
||||
@ -16,6 +16,8 @@
|
||||
package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.kv.*;
|
||||
import org.thingsboard.server.dao.model.ToData;
|
||||
|
||||
@ -29,6 +31,8 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
|
||||
@Entity
|
||||
@Table(name = "attribute_kv")
|
||||
@IdClass(AttributeKvCompositeKey.class)
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public class AttributeKvEntity implements ToData<AttributeKvEntry>, Serializable {
|
||||
|
||||
@Id
|
||||
|
||||
@ -17,15 +17,23 @@ package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
|
||||
import javax.persistence.Transient;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public class RelationCompositeKey implements Serializable {
|
||||
|
||||
@Transient
|
||||
private static final long serialVersionUID = -4089175869616037592L;
|
||||
|
||||
private UUID fromId;
|
||||
private String fromType;
|
||||
private UUID toId;
|
||||
|
||||
@ -17,6 +17,8 @@ package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.hibernate.annotations.Type;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
@ -32,11 +34,10 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
|
||||
@Entity
|
||||
@Table(name = RELATION_COLUMN_FAMILY_NAME)
|
||||
@IdClass(RelationCompositeKey.class)
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public final class RelationEntity implements ToData<EntityRelation> {
|
||||
|
||||
@Transient
|
||||
private static final long serialVersionUID = -4089175869616037592L;
|
||||
|
||||
@Id
|
||||
@Column(name = RELATION_FROM_ID_PROPERTY)
|
||||
private UUID fromId;
|
||||
@ -83,88 +84,6 @@ public final class RelationEntity implements ToData<EntityRelation> {
|
||||
this.additionalInfo = relation.getAdditionalInfo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((additionalInfo == null) ? 0 : additionalInfo.hashCode());
|
||||
result = prime * result + ((toId == null) ? 0 : toId.hashCode());
|
||||
result = prime * result + ((toType == null) ? 0 : toType.hashCode());
|
||||
result = prime * result + ((fromId == null) ? 0 : fromId.hashCode());
|
||||
result = prime * result + ((fromType == null) ? 0 : fromType.hashCode());
|
||||
result = prime * result + ((relationType == null) ? 0 : relationType.hashCode());
|
||||
result = prime * result + ((relationTypeGroup == null) ? 0 : relationTypeGroup.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
RelationEntity other = (RelationEntity) obj;
|
||||
if (additionalInfo == null) {
|
||||
if (other.additionalInfo != null)
|
||||
return false;
|
||||
} else if (!additionalInfo.equals(other.additionalInfo))
|
||||
return false;
|
||||
if (toId == null) {
|
||||
if (other.toId != null)
|
||||
return false;
|
||||
} else if (!toId.equals(other.toId))
|
||||
return false;
|
||||
if (fromId == null) {
|
||||
if (other.fromId != null)
|
||||
return false;
|
||||
} else if (!fromId.equals(other.fromId))
|
||||
return false;
|
||||
if (toType == null) {
|
||||
if (other.toType != null)
|
||||
return false;
|
||||
} else if (!toType.equals(other.toType))
|
||||
return false;
|
||||
if (fromType == null) {
|
||||
if (other.fromType != null)
|
||||
return false;
|
||||
} else if (!fromType.equals(other.fromType))
|
||||
return false;
|
||||
if (relationType == null) {
|
||||
if (other.relationType != null)
|
||||
return false;
|
||||
} else if (!relationType.equals(other.relationType))
|
||||
return false;
|
||||
if (relationTypeGroup == null) {
|
||||
if (other.relationTypeGroup != null)
|
||||
return false;
|
||||
} else if (!relationTypeGroup.equals(other.relationTypeGroup))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("AssetEntity [toId=");
|
||||
builder.append(toId);
|
||||
builder.append(", toType=");
|
||||
builder.append(toType);
|
||||
builder.append(", fromId=");
|
||||
builder.append(fromId);
|
||||
builder.append(", fromType=");
|
||||
builder.append(fromType);
|
||||
builder.append(", relationType=");
|
||||
builder.append(relationType);
|
||||
builder.append(", relationTypeGroup=");
|
||||
builder.append(relationTypeGroup);
|
||||
builder.append(", additionalInfo=");
|
||||
builder.append(additionalInfo);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityRelation toData() {
|
||||
EntityRelation relation = new EntityRelation();
|
||||
|
||||
@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
|
||||
import javax.persistence.Transient;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public class TsKvCompositeKey implements Serializable{
|
||||
|
||||
@Transient
|
||||
private static final long serialVersionUID = -4089175869616037523L;
|
||||
|
||||
private String entityType;
|
||||
private UUID entityId;
|
||||
private String key;
|
||||
private long ts;
|
||||
}
|
||||
@ -0,0 +1,69 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.dao.model.ToData;
|
||||
|
||||
import javax.persistence.*;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.*;
|
||||
|
||||
@Data
|
||||
@Entity
|
||||
@Table(name = "ts_kv")
|
||||
@IdClass(TsKvCompositeKey.class)
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public final class TsKvEntity implements ToData<TsKvEntry> {
|
||||
|
||||
@Id
|
||||
@Column(name = ENTITY_TYPE_COLUMN)
|
||||
private String entityType;
|
||||
|
||||
@Id
|
||||
@Column(name = ENTITY_ID_COLUMN)
|
||||
private UUID entityId;
|
||||
|
||||
@Id
|
||||
@Column(name = KEY_COLUMN)
|
||||
private String key;
|
||||
|
||||
@Id
|
||||
@Column(name = TS_COLUMN)
|
||||
private long ts;
|
||||
|
||||
@Column(name = BOOLEAN_VALUE_COLUMN)
|
||||
private Boolean booleanValue;
|
||||
|
||||
@Column(name = STRING_VALUE_COLUMN)
|
||||
private String strValue;
|
||||
|
||||
@Column(name = LONG_VALUE_COLUMN)
|
||||
private Long longValue;
|
||||
|
||||
@Column(name = DOUBLE_VALUE_COLUMN)
|
||||
private Double doubleValue;
|
||||
|
||||
@Override
|
||||
public TsKvEntry toData() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
|
||||
import javax.persistence.Transient;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public class TsKvLatestCompositeKey implements Serializable{
|
||||
|
||||
@Transient
|
||||
private static final long serialVersionUID = -4089175869616037523L;
|
||||
|
||||
private String entityType;
|
||||
private UUID entityId;
|
||||
private String key;
|
||||
}
|
||||
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.dao.model.ToData;
|
||||
|
||||
import javax.persistence.*;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.thingsboard.server.dao.model.ModelConstants.*;
|
||||
|
||||
@Data
|
||||
@Entity
|
||||
@Table(name = "ts_kv_latest")
|
||||
@IdClass(TsKvLatestCompositeKey.class)
|
||||
@EqualsAndHashCode
|
||||
@ToString
|
||||
public final class TsKvLatestEntity implements ToData<TsKvEntry> {
|
||||
|
||||
@Id
|
||||
@Column(name = ENTITY_TYPE_COLUMN)
|
||||
private String entityType;
|
||||
|
||||
@Id
|
||||
@Column(name = ENTITY_ID_COLUMN)
|
||||
private UUID entityId;
|
||||
|
||||
@Id
|
||||
@Column(name = KEY_COLUMN)
|
||||
private String key;
|
||||
|
||||
@Column(name = TS_COLUMN)
|
||||
private long ts;
|
||||
|
||||
@Column(name = BOOLEAN_VALUE_COLUMN)
|
||||
private Boolean booleanValue;
|
||||
|
||||
@Column(name = STRING_VALUE_COLUMN)
|
||||
private String strValue;
|
||||
|
||||
@Column(name = LONG_VALUE_COLUMN)
|
||||
private Long longValue;
|
||||
|
||||
@Column(name = DOUBLE_VALUE_COLUMN)
|
||||
private Double doubleValue;
|
||||
|
||||
@Override
|
||||
public TsKvEntry toData() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -15,13 +15,20 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.timeseries;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvQuery;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvEntity;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesDao;
|
||||
|
||||
import java.util.List;
|
||||
@ -29,41 +36,80 @@ import java.util.List;
|
||||
@Component
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(prefix = "sql", value = "enabled", havingValue = "true")
|
||||
public class JpaTimeseriesDao implements TimeseriesDao {
|
||||
public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao {
|
||||
|
||||
@Override
|
||||
public long toPartitionTs(long ts) {
|
||||
return 0;
|
||||
}
|
||||
@Autowired
|
||||
private TsKvRepository tsKvRepository;
|
||||
|
||||
@Autowired
|
||||
private TsKvLatestRepository tsKvLatestRepository;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, TsKvQuery query) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key) {
|
||||
return null;
|
||||
TsKvLatestCompositeKey compositeKey =
|
||||
new TsKvLatestCompositeKey(
|
||||
entityId.getEntityType().name(),
|
||||
entityId.getId(),
|
||||
key);
|
||||
return service.submit(() ->
|
||||
DaoUtil.getData(tsKvLatestRepository.findOne(compositeKey)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId) {
|
||||
return null;
|
||||
return service.submit(() ->
|
||||
DaoUtil.convertDataList(Lists.newArrayList(
|
||||
tsKvLatestRepository.findAllByEntityTypeAndEntityId(
|
||||
entityId.getEntityType().name(),
|
||||
entityId.getId()))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
|
||||
return null;
|
||||
public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
TsKvEntity entity = new TsKvEntity();
|
||||
entity.setEntityType(entityId.getEntityType().name());
|
||||
entity.setEntityId(entityId.getId());
|
||||
entity.setTs(tsKvEntry.getTs());
|
||||
entity.setKey(tsKvEntry.getKey());
|
||||
entity.setStrValue(tsKvEntry.getStrValue().orElse(null));
|
||||
entity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
|
||||
entity.setLongValue(tsKvEntry.getLongValue().orElse(null));
|
||||
entity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
|
||||
return service.submit(() -> {
|
||||
tsKvRepository.save(entity);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
|
||||
public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry) {
|
||||
return null;
|
||||
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
|
||||
latestEntity.setEntityType(entityId.getEntityType().name());
|
||||
latestEntity.setEntityId(entityId.getId());
|
||||
latestEntity.setTs(tsKvEntry.getTs());
|
||||
latestEntity.setKey(tsKvEntry.getKey());
|
||||
latestEntity.setStrValue(tsKvEntry.getStrValue().orElse(null));
|
||||
latestEntity.setDoubleValue(tsKvEntry.getDoubleValue().orElse(null));
|
||||
latestEntity.setLongValue(tsKvEntry.getLongValue().orElse(null));
|
||||
latestEntity.setBooleanValue(tsKvEntry.getBooleanValue().orElse(null));
|
||||
return service.submit(() -> {
|
||||
tsKvLatestRepository.save(latestEntity);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,30 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.timeseries;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.data.repository.CrudRepository;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvLatestEntity;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@ConditionalOnProperty(prefix = "sql", value = "enabled", havingValue = "true")
|
||||
public interface TsKvLatestRepository extends CrudRepository<TsKvLatestEntity, TsKvLatestCompositeKey> {
|
||||
|
||||
List<TsKvLatestEntity> findAllByEntityTypeAndEntityId(String entityType, UUID entityId);
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.timeseries;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.data.repository.CrudRepository;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
|
||||
import org.thingsboard.server.dao.model.sql.TsKvEntity;
|
||||
|
||||
@ConditionalOnProperty(prefix = "sql", value = "enabled", havingValue = "true")
|
||||
public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvCompositeKey> {
|
||||
}
|
||||
@ -72,9 +72,8 @@ public class BaseTimeseriesService implements TimeseriesService {
|
||||
if (tsKvEntry == null) {
|
||||
throw new IncorrectParameterException("Key value entry can't be null");
|
||||
}
|
||||
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
|
||||
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY);
|
||||
saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, 0L);
|
||||
saveAndRegisterFutures(futures, entityId, tsKvEntry, 0L);
|
||||
return Futures.allAsList(futures);
|
||||
}
|
||||
|
||||
@ -85,16 +84,15 @@ public class BaseTimeseriesService implements TimeseriesService {
|
||||
if (tsKvEntry == null) {
|
||||
throw new IncorrectParameterException("Key value entry can't be null");
|
||||
}
|
||||
long partitionTs = timeseriesDao.toPartitionTs(tsKvEntry.getTs());
|
||||
saveAndRegisterFutures(futures, entityId, tsKvEntry, partitionTs, ttl);
|
||||
saveAndRegisterFutures(futures, entityId, tsKvEntry, ttl);
|
||||
}
|
||||
return Futures.allAsList(futures);
|
||||
}
|
||||
|
||||
private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long partitionTs, long ttl) {
|
||||
futures.add(timeseriesDao.savePartition(entityId, partitionTs, tsKvEntry.getKey(), ttl));
|
||||
private void saveAndRegisterFutures(List<ListenableFuture<Void>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
futures.add(timeseriesDao.savePartition(entityId, tsKvEntry.getTs(), tsKvEntry.getKey(), ttl));
|
||||
futures.add(timeseriesDao.saveLatest(entityId, tsKvEntry));
|
||||
futures.add(timeseriesDao.save(entityId, partitionTs, tsKvEntry, ttl));
|
||||
futures.add(timeseriesDao.save(entityId, tsKvEntry, ttl));
|
||||
}
|
||||
|
||||
private static void validate(EntityId entityId) {
|
||||
|
||||
@ -89,12 +89,6 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
||||
super.stopExecutor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long toPartitionTs(long ts) {
|
||||
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
|
||||
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries) {
|
||||
List<ListenableFuture<List<TsKvEntry>>> futures = queries.stream().map(query -> findAllAsync(entityId, query)).collect(Collectors.toList());
|
||||
@ -163,6 +157,11 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
||||
return resultFuture;
|
||||
}
|
||||
|
||||
private long toPartitionTs(long ts) {
|
||||
LocalDateTime time = LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC);
|
||||
return tsFormat.truncatedTo(time).toInstant(ZoneOffset.UTC).toEpochMilli();
|
||||
}
|
||||
|
||||
private void findAllAsyncSequentiallyWithLimit(final TsKvQueryCursor cursor, final SimpleListenableFuture<List<TsKvEntry>> resultFuture) {
|
||||
if (cursor.isFull() || !cursor.hasNextPartition()) {
|
||||
resultFuture.set(cursor.getData());
|
||||
@ -259,7 +258,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
|
||||
public ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
long partition = toPartitionTs(tsKvEntry.getTs());
|
||||
DataType type = tsKvEntry.getDataType();
|
||||
BoundStatement stmt = (ttl == 0 ? getSaveStmt(type) : getSaveTtlStmt(type)).bind();
|
||||
stmt.setString(0, entityId.getEntityType().name())
|
||||
@ -275,7 +275,8 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl) {
|
||||
public ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl) {
|
||||
long partition = toPartitionTs(tsKvEntryTs);
|
||||
log.debug("Saving partition {} for the entity [{}-{}] and key {}", partition, entityId.getEntityType(), entityId.getId(), key);
|
||||
BoundStatement stmt = (ttl == 0 ? getPartitionInsertStmt() : getPartitionInsertTtlStmt()).bind();
|
||||
stmt = stmt.setString(0, entityId.getEntityType().name())
|
||||
|
||||
@ -27,17 +27,15 @@ import java.util.List;
|
||||
*/
|
||||
public interface TimeseriesDao {
|
||||
|
||||
long toPartitionTs(long ts);
|
||||
|
||||
ListenableFuture<List<TsKvEntry>> findAllAsync(EntityId entityId, List<TsKvQuery> queries);
|
||||
|
||||
ListenableFuture<TsKvEntry> findLatest(EntityId entityId, String key);
|
||||
|
||||
ListenableFuture<List<TsKvEntry>> findAllLatest(EntityId entityId);
|
||||
|
||||
ListenableFuture<Void> save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl);
|
||||
ListenableFuture<Void> save(EntityId entityId, TsKvEntry tsKvEntry, long ttl);
|
||||
|
||||
ListenableFuture<Void> savePartition(EntityId entityId, long partition, String key, long ttl);
|
||||
ListenableFuture<Void> savePartition(EntityId entityId, long tsKvEntryTs, String key, long ttl);
|
||||
|
||||
ListenableFuture<Void> saveLatest(EntityId entityId, TsKvEntry tsKvEntry);
|
||||
}
|
||||
|
||||
@ -51,18 +51,6 @@ CREATE TABLE IF NOT EXISTS alarm (
|
||||
);
|
||||
ALTER TABLE alarm OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS relation (
|
||||
from_id uuid,
|
||||
from_type character varying(255),
|
||||
to_id uuid,
|
||||
to_type character varying(255),
|
||||
relation_type_group character varying(255),
|
||||
relation_type character varying(255),
|
||||
additional_info jsonb,
|
||||
CONSTRAINT relation_unq_key UNIQUE (from_id, from_type, relation_type_group, relation_type, to_id, to_type)
|
||||
);
|
||||
ALTER TABLE relation OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS asset (
|
||||
id uuid NOT NULL CONSTRAINT asset_pkey PRIMARY KEY,
|
||||
additional_info jsonb,
|
||||
@ -172,6 +160,18 @@ CREATE TABLE IF NOT EXISTS plugin (
|
||||
);
|
||||
ALTER TABLE plugin OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS relation (
|
||||
from_id uuid,
|
||||
from_type character varying(255),
|
||||
to_id uuid,
|
||||
to_type character varying(255),
|
||||
relation_type_group character varying(255),
|
||||
relation_type character varying(255),
|
||||
additional_info jsonb,
|
||||
CONSTRAINT relation_unq_key UNIQUE (from_id, from_type, relation_type_group, relation_type, to_id, to_type)
|
||||
);
|
||||
ALTER TABLE relation OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS rule (
|
||||
id uuid NOT NULL CONSTRAINT rule_pkey PRIMARY KEY,
|
||||
action jsonb,
|
||||
@ -217,6 +217,32 @@ CREATE TABLE IF NOT EXISTS tenant (
|
||||
);
|
||||
ALTER TABLE tenant OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ts_kv (
|
||||
entity_type character varying(255) NOT NULL,
|
||||
entity_id uuid NOT NULL,
|
||||
key character varying(255) NOT NULL,
|
||||
ts bigint NOT NULL,
|
||||
bool_v boolean,
|
||||
str_v character varying(255),
|
||||
long_v bigint,
|
||||
dbl_v double precision,
|
||||
CONSTRAINT ts_kv_unq_key UNIQUE (entity_type, entity_id, key, ts)
|
||||
);
|
||||
ALTER TABLE ts_kv OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS ts_kv_latest (
|
||||
entity_type character varying(255) NOT NULL,
|
||||
entity_id uuid NOT NULL,
|
||||
key character varying(255) NOT NULL,
|
||||
ts bigint NOT NULL,
|
||||
bool_v boolean,
|
||||
str_v character varying(255),
|
||||
long_v bigint,
|
||||
dbl_v double precision,
|
||||
CONSTRAINT ts_kv_latest_unq_key UNIQUE (entity_type, entity_id, key)
|
||||
);
|
||||
ALTER TABLE ts_kv OWNER TO postgres;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS user_credentials (
|
||||
id uuid NOT NULL CONSTRAINT user_credentials_pkey PRIMARY KEY,
|
||||
activate_token character varying(255) UNIQUE,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user