diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index 7d986a12a3..701f585e77 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -93,8 +93,8 @@ public final class PluginProcessingContext implements PluginContext { @Override public void saveAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List attributes, final PluginCallback callback) { validate(entityId, new ValidationCallback(callback, ctx -> { - ListenableFuture> rsListFuture = pluginCtx.attributesService.save(entityId, scope, attributes); - Futures.addCallback(rsListFuture, getListCallback(callback, v -> { + ListenableFuture> attrKvListFuture = pluginCtx.attributesService.save(entityId, scope, attributes); + Futures.addCallback(attrKvListFuture, getListCallback(callback, v -> { if (entityId.getEntityType() == EntityType.DEVICE) { onDeviceAttributesChanged(tenantId, new DeviceId(entityId.getId()), scope, attributes); } @@ -106,7 +106,7 @@ public final class PluginProcessingContext implements PluginContext { @Override public void removeAttributes(final TenantId tenantId, final EntityId entityId, final String scope, final List keys, final PluginCallback callback) { validate(entityId, new ValidationCallback(callback, ctx -> { - ListenableFuture> future = pluginCtx.attributesService.removeAll(entityId, scope, keys); + ListenableFuture> future = pluginCtx.attributesService.removeAll(entityId, scope, keys); Futures.addCallback(future, getCallback(callback, v -> null), executor); if (entityId.getEntityType() == EntityType.DEVICE) { onDeviceAttributesDeleted(tenantId, new DeviceId(entityId.getId()), keys.stream().map(key -> new AttributeKey(scope, key)).collect(Collectors.toSet())); @@ -235,10 +235,10 @@ public final class PluginProcessingContext implements PluginContext { pluginCtx.toDeviceActor(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, scope, values)); } - private FutureCallback> getListCallback(final PluginCallback callback, Function, T> transformer) { - return new FutureCallback>() { + private FutureCallback> getListCallback(final PluginCallback callback, Function, R> transformer) { + return new FutureCallback>() { @Override - public void onSuccess(@Nullable List result) { + public void onSuccess(@Nullable List result) { pluginCtx.self().tell(PluginCallbackMessage.onSuccess(callback, transformer.apply(result)), ActorRef.noSender()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java index f241f2ca8b..d61218f012 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractAsyncDao.java @@ -15,6 +15,13 @@ */ package org.thingsboard.server.dao; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; @@ -39,4 +46,13 @@ public abstract class CassandraAbstractAsyncDao extends CassandraAbstractDao { } } + protected ListenableFuture getFuture(ResultSetFuture future, java.util.function.Function transformer) { + return Futures.transform(future, new Function() { + @Nullable + @Override + public T apply(@Nullable ResultSet input) { + return transformer.apply(input); + } + }, readResultsProcessingExecutor); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java index bd7346acf3..8bb44bf4af 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/CassandraAbstractDao.java @@ -66,7 +66,6 @@ public abstract class CassandraAbstractDao { return execute(statement, defaultWriteLevel); } - protected ResultSetFuture executeAsyncRead(Statement statement) { return executeAsync(statement, defaultReadLevel); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index ae58d4d9c6..4f0af49f41 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.dao.attributes; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -36,7 +34,7 @@ public interface AttributesDao { ListenableFuture> findAll(EntityId entityId, String attributeType); - ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute); + ListenableFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute); - ListenableFuture> removeAll(EntityId entityId, String scope, List keys); + ListenableFuture> removeAll(EntityId entityId, String scope, List keys); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 6bf9fb2bd5..6090fa9349 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -15,12 +15,8 @@ */ package org.thingsboard.server.dao.attributes; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import java.util.Collection; @@ -38,7 +34,7 @@ public interface AttributesService { ListenableFuture> findAll(EntityId entityId, String scope); - ListenableFuture> save(EntityId entityId, String scope, List attributes); + ListenableFuture> save(EntityId entityId, String scope, List attributes); - ListenableFuture> removeAll(EntityId entityId, String scope, List attributeKeys); + ListenableFuture> removeAll(EntityId entityId, String scope, List attributeKeys); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index da4611973d..bef2faa37f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -61,10 +61,10 @@ public class BaseAttributesService implements AttributesService { } @Override - public ListenableFuture> save(EntityId entityId, String scope, List attributes) { + public ListenableFuture> save(EntityId entityId, String scope, List attributes) { validate(entityId, scope); attributes.forEach(attribute -> validate(attribute)); - List futures = Lists.newArrayListWithExpectedSize(attributes.size()); + List> futures = Lists.newArrayListWithExpectedSize(attributes.size()); for (AttributeKvEntry attribute : attributes) { futures.add(attributesDao.save(entityId, scope, attribute)); } @@ -72,7 +72,7 @@ public class BaseAttributesService implements AttributesService { } @Override - public ListenableFuture> removeAll(EntityId entityId, String scope, List keys) { + public ListenableFuture> removeAll(EntityId entityId, String scope, List keys) { validate(entityId, scope); return attributesDao.removeAll(entityId, scope, keys); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java index db5babecf6..95f92702e0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CassandraBaseAttributesDao.java @@ -101,7 +101,7 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem } @Override - public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { + public ListenableFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { BoundStatement stmt = getSaveStmt().bind(); stmt.setString(0, entityId.getEntityType().name()); stmt.setUUID(1, entityId.getId()); @@ -124,23 +124,31 @@ public class CassandraBaseAttributesDao extends CassandraAbstractAsyncDao implem } else { stmt.setToNull(8); } - return executeAsyncWrite(stmt); + log.trace("Generated save stmt [{}] for entityId {} and attributeType {} and attribute", stmt, entityId, attributeType, attribute); + return getFuture(executeAsyncWrite(stmt), rs -> getAttributeKvEntryFromRs(rs)); + } + + private Void getAttributeKvEntryFromRs(ResultSet rs) { + return null; } @Override - public ListenableFuture> removeAll(EntityId entityId, String attributeType, List keys) { - List futures = keys.stream().map(key -> delete(entityId, attributeType, key)).collect(Collectors.toList()); + public ListenableFuture> removeAll(EntityId entityId, String attributeType, List keys) { + List> futures = keys + .stream() + .map(key -> delete(entityId, attributeType, key)) + .collect(Collectors.toList()); return Futures.allAsList(futures); } - private ResultSetFuture delete(EntityId entityId, String attributeType, String key) { + private ListenableFuture delete(EntityId entityId, String attributeType, String key) { Statement delete = QueryBuilder.delete().all().from(ModelConstants.ATTRIBUTES_KV_CF) .where(eq(ENTITY_TYPE_COLUMN, entityId.getEntityType())) .and(eq(ENTITY_ID_COLUMN, entityId.getId())) .and(eq(ATTRIBUTE_TYPE_COLUMN, attributeType)) .and(eq(ATTRIBUTE_KEY_COLUMN, key)); log.debug("Remove request: {}", delete.toString()); - return getSession().executeAsync(delete); + return getFuture(getSession().executeAsync(delete), rs -> getAttributeKvEntryFromRs(rs)); } private PreparedStatement getSaveStmt() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java index 8415bc9903..0fdd6ca650 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java @@ -298,16 +298,6 @@ public class BaseRelationDao extends CassandraAbstractAsyncDao implements Relati return getFuture(rsFuture, rs -> rs != null ? rs.wasApplied() : false); } - private ListenableFuture getFuture(ResultSetFuture future, java.util.function.Function transformer) { - return Futures.transform(future, new Function() { - @Nullable - @Override - public T apply(@Nullable ResultSet input) { - return transformer.apply(input); - } - }, readResultsProcessingExecutor); - } - private List getEntityRelations(ResultSet rs) { List rows = rs.all(); List entries = new ArrayList<>(rows.size()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java index eb4a639a99..d3a111fa9e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributesDao.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.dao.sql.attributes; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -50,12 +48,12 @@ public class JpaAttributesDao implements AttributesDao { } @Override - public ResultSetFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { + public ListenableFuture save(EntityId entityId, String attributeType, AttributeKvEntry attribute) { return null; } @Override - public ListenableFuture> removeAll(EntityId entityId, String scope, List keys) { + public ListenableFuture> removeAll(EntityId entityId, String scope, List keys) { return null; } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 8590876029..ae7573e046 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -54,12 +54,12 @@ public class JpaTimeseriesDao implements TimeseriesDao { } @Override - public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry) { + public ResultSetFuture save(EntityId entityId, long partition, TsKvEntry tsKvEntry, long ttl) { return null; } @Override - public ResultSetFuture savePartition(EntityId entityId, long partition, String key) { + public ResultSetFuture savePartition(EntityId entityId, long partition, String key, long ttl) { return null; }