Refactored attributes dao
This commit is contained in:
parent
11a2844072
commit
79623fbd3d
@ -93,8 +93,8 @@ public final class PluginProcessingContext implements PluginContext {
|
|||||||
@Override
|
@Override
|
||||||
public void saveAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List<AttributeKvEntry> attributes, final PluginCallback<Void> callback) {
|
public void saveAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List<AttributeKvEntry> attributes, final PluginCallback<Void> callback) {
|
||||||
validate(entityId, new ValidationCallback(callback, ctx -> {
|
validate(entityId, new ValidationCallback(callback, ctx -> {
|
||||||
ListenableFuture<List<ResultSet>> rsListFuture = pluginCtx.attributesService.save(entityId, scope, attributes);
|
ListenableFuture<List<Void>> attrKvListFuture = pluginCtx.attributesService.save(entityId, scope, attributes);
|
||||||
Futures.addCallback(rsListFuture, getListCallback(callback, v -> {
|
Futures.addCallback(attrKvListFuture, getListCallback(callback, v -> {
|
||||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||||
onDeviceAttributesChanged(tenantId, new DeviceId(entityId.getId()), scope, attributes);
|
onDeviceAttributesChanged(tenantId, new DeviceId(entityId.getId()), scope, attributes);
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ public final class PluginProcessingContext implements PluginContext {
|
|||||||
@Override
|
@Override
|
||||||
public void removeAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List<String> keys, final PluginCallback<Void> callback) {
|
public void removeAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List<String> keys, final PluginCallback<Void> callback) {
|
||||||
validate(entityId, new ValidationCallback(callback, ctx -> {
|
validate(entityId, new ValidationCallback(callback, ctx -> {
|
||||||
ListenableFuture<List<ResultSet>> future = pluginCtx.attributesService.removeAll(entityId, scope, keys);
|
ListenableFuture<List<Void>> future = pluginCtx.attributesService.removeAll(entityId, scope, keys);
|
||||||
Futures.addCallback(future, getCallback(callback, v -> null), executor);
|
Futures.addCallback(future, getCallback(callback, v -> null), executor);
|
||||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||||
onDeviceAttributesDeleted(tenantId, new DeviceId(entityId.getId()), keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
|
onDeviceAttributesDeleted(tenantId, new DeviceId(entityId.getId()), keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet()));
|
||||||
@ -235,10 +235,10 @@ public final class PluginProcessingContext implements PluginContext {
|
|||||||
pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values));
|
pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> FutureCallback<List<ResultSet>> getListCallback(final PluginCallback<T> callback, Function<List<ResultSet>, T> transformer) {
|
private <T, R> FutureCallback<List<T>> getListCallback(final PluginCallback<R> callback, Function<List<T>, R> transformer) {
|
||||||
return new FutureCallback<List<ResultSet>>() {
|
return new FutureCallback<List<T>>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable List<ResultSet> result) {
|
public void onSuccess(@Nullable List<T> result) {
|
||||||
pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
|
pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao;
|
package org.thingsboard.server.dao;
|
||||||
|
|
||||||
|
import com.datastax.driver.core.ResultSet;
|
||||||
|
import com.datastax.driver.core.ResultSetFuture;
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@ -39,4 +46,13 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected <T> ListenableFuture<T> getFuture(ResultSetFuture future, java.util.function.Function<ResultSet, T> transformer) {
|
||||||
|
return Futures.transform(future, new Function<ResultSet, T>() {
|
||||||
|
@Nullable
|
||||||
|
@Override
|
||||||
|
public T apply(@Nullable ResultSet input) {
|
||||||
|
return transformer.apply(input);
|
||||||
|
}
|
||||||
|
}, readResultsProcessingExecutor);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -66,7 +66,6 @@ public abstract class CassandraAbstractDao {
|
|||||||
return execute(statement, defaultWriteLevel);
|
return execute(statement, defaultWriteLevel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected ResultSetFuture executeAsyncRead(Statement statement) {
|
protected ResultSetFuture executeAsyncRead(Statement statement) {
|
||||||
return executeAsync(statement, defaultReadLevel);
|
return executeAsync(statement, defaultReadLevel);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.attributes;
|
package org.thingsboard.server.dao.attributes;
|
||||||
|
|
||||||
import com.datastax.driver.core.ResultSet;
|
|
||||||
import com.datastax.driver.core.ResultSetFuture;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
@ -36,7 +34,7 @@ public interface AttributesDao {
|
|||||||
|
|
||||||
ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType);
|
ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String attributeType);
|
||||||
|
|
||||||
ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
|
ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute);
|
||||||
|
|
||||||
ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys);
|
ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> keys);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,12 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.attributes;
|
package org.thingsboard.server.dao.attributes;
|
||||||
|
|
||||||
import com.datastax.driver.core.ResultSet;
|
|
||||||
import com.datastax.driver.core.ResultSetFuture;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.UUIDBased;
|
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -38,7 +34,7 @@ public interface AttributesService {
|
|||||||
|
|
||||||
ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope);
|
ListenableFuture<List<AttributeKvEntry>> findAll(EntityId entityId, String scope);
|
||||||
|
|
||||||
ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
|
ListenableFuture<List<Void>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes);
|
||||||
|
|
||||||
ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
|
ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> attributeKeys);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -61,10 +61,10 @@ public class BaseAttributesService implements AttributesService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<ResultSet>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
|
public ListenableFuture<List<Void>> save(EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
|
||||||
validate(entityId, scope);
|
validate(entityId, scope);
|
||||||
attributes.forEach(attribute -> validate(attribute));
|
attributes.forEach(attribute -> validate(attribute));
|
||||||
List<ResultSetFuture> futures = Lists.newArrayListWithExpectedSize(attributes.size());
|
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(attributes.size());
|
||||||
for (AttributeKvEntry attribute : attributes) {
|
for (AttributeKvEntry attribute : attributes) {
|
||||||
futures.add(attributesDao.save(entityId, scope, attribute));
|
futures.add(attributesDao.save(entityId, scope, attribute));
|
||||||
}
|
}
|
||||||
@ -72,7 +72,7 @@ public class BaseAttributesService implements AttributesService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
|
public ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> keys) {
|
||||||
validate(entityId, scope);
|
validate(entityId, scope);
|
||||||
return attributesDao.removeAll(entityId, scope, keys);
|
return attributesDao.removeAll(entityId, scope, keys);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -101,7 +101,7 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
|
public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
|
||||||
BoundStatement stmt = getSaveStmt().bind();
|
BoundStatement stmt = getSaveStmt().bind();
|
||||||
stmt.setString(0, entityId.getEntityType().name());
|
stmt.setString(0, entityId.getEntityType().name());
|
||||||
stmt.setUUID(1, entityId.getId());
|
stmt.setUUID(1, entityId.getId());
|
||||||
@ -124,23 +124,31 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem
|
|||||||
} else {
|
} else {
|
||||||
stmt.setToNull(8);
|
stmt.setToNull(8);
|
||||||
}
|
}
|
||||||
return executeAsyncWrite(stmt);
|
log.trace("Generated save stmt [{}] for entityId {} and attributeType {} and attribute", stmt, entityId, attributeType, attribute);
|
||||||
|
return getFuture(executeAsyncWrite(stmt), rs -> getAttributeKvEntryFromRs(rs));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Void getAttributeKvEntryFromRs(ResultSet rs) {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
|
public ListenableFuture<List<Void>> removeAll(EntityId entityId, String attributeType, List<String> keys) {
|
||||||
List<ResultSetFuture> futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList());
|
List<ListenableFuture<Void>> futures = keys
|
||||||
|
.stream()
|
||||||
|
.map(key -> delete(entityId, attributeType, key))
|
||||||
|
.collect(Collectors.toList());
|
||||||
return Futures.allAsList(futures);
|
return Futures.allAsList(futures);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ResultSetFuture delete(EntityId entityId, String attributeType, String key) {
|
private ListenableFuture<Void> delete(EntityId entityId, String attributeType, String key) {
|
||||||
Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
|
Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF)
|
||||||
.where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
|
.where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType()))
|
||||||
.and(eq(ENTITY_ID_COLUMN, entityId.getId()))
|
.and(eq(ENTITY_ID_COLUMN, entityId.getId()))
|
||||||
.and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
|
.and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType))
|
||||||
.and(eq(ATTRIBUTE_KEY_COLUMN, key));
|
.and(eq(ATTRIBUTE_KEY_COLUMN, key));
|
||||||
log.debug("Remove request: {}", delete.toString());
|
log.debug("Remove request: {}", delete.toString());
|
||||||
return getSession().executeAsync(delete);
|
return getFuture(getSession().executeAsync(delete), rs -> getAttributeKvEntryFromRs(rs));
|
||||||
}
|
}
|
||||||
|
|
||||||
private PreparedStatement getSaveStmt() {
|
private PreparedStatement getSaveStmt() {
|
||||||
|
|||||||
@ -298,16 +298,6 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati
|
|||||||
return getFuture(rsFuture, rs -> rs != null ? rs.wasApplied() : false);
|
return getFuture(rsFuture, rs -> rs != null ? rs.wasApplied() : false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> ListenableFuture<T> getFuture(ResultSetFuture future, java.util.function.Function<ResultSet, T> transformer) {
|
|
||||||
return Futures.transform(future, new Function<ResultSet, T>() {
|
|
||||||
@Nullable
|
|
||||||
@Override
|
|
||||||
public T apply(@Nullable ResultSet input) {
|
|
||||||
return transformer.apply(input);
|
|
||||||
}
|
|
||||||
}, readResultsProcessingExecutor);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<EntityRelation> getEntityRelations(ResultSet rs) {
|
private List<EntityRelation> getEntityRelations(ResultSet rs) {
|
||||||
List<Row> rows = rs.all();
|
List<Row> rows = rs.all();
|
||||||
List<EntityRelation> entries = new ArrayList<>(rows.size());
|
List<EntityRelation> entries = new ArrayList<>(rows.size());
|
||||||
|
|||||||
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.sql.attributes;
|
package org.thingsboard.server.dao.sql.attributes;
|
||||||
|
|
||||||
import com.datastax.driver.core.ResultSet;
|
|
||||||
import com.datastax.driver.core.ResultSetFuture;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
@ -50,12 +48,12 @@ public class JpaAttributesDao implements AttributesDao {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
|
public ListenableFuture<Void> save(EntityId entityId, String attributeType, AttributeKvEntry attribute) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<ResultSet>> removeAll(EntityId entityId, String scope, List<String> keys) {
|
public ListenableFuture<List<Void>> removeAll(EntityId entityId, String scope, List<String> keys) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -54,12 +54,12 @@ public class JpaTimeseriesDao implements TimeseriesDao {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) {
|
public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultSetFuture savePartition(EntityId entityId, long partition, String key) {
|
public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user