diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java index 9b7ffd2d11..2a9c073d34 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java @@ -29,7 +29,6 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; -import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntitySubtype; @@ -39,7 +38,6 @@ import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; @@ -47,7 +45,6 @@ import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; -import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.service.security.model.SecurityUser; @@ -174,7 +171,7 @@ public class EntityViewController extends BaseController { EntityView entityView = checkEntityViewId(entityViewId); entityViewService.deleteEntityView(entityViewId); logEntityAction(entityViewId, entityView, entityView.getCustomerId(), - ActionType.DELETED,null, strEntityViewId); + ActionType.DELETED, null, strEntityViewId); } catch (Exception e) { logEntityAction(emptyId(EntityType.ENTITY_VIEW), null, @@ -184,11 +181,24 @@ public class EntityViewController extends BaseController { } } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/tenant/entityViews", params = {"entityViewName"}, method = RequestMethod.GET) + @ResponseBody + public EntityView getTenantEntityView( + @RequestParam String entityViewName) throws ThingsboardException { + try { + TenantId tenantId = getCurrentUser().getTenantId(); + return checkNotNull(entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName)); + } catch (Exception e) { + throw handleException(e); + } + } + @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/customer/{customerId}/entityView/{entityViewId}", method = RequestMethod.POST) @ResponseBody public EntityView assignEntityViewToCustomer(@PathVariable(CUSTOMER_ID) String strCustomerId, - @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException { + @PathVariable(ENTITY_VIEW_ID) String strEntityViewId) throws ThingsboardException { checkParameter(CUSTOMER_ID, strCustomerId); checkParameter(ENTITY_VIEW_ID, strEntityViewId); try { diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 6c37614147..ef09a7a62c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -49,9 +49,11 @@ import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -60,12 +62,10 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.service.security.AccessValidator; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.telemetry.AttributeData; -import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.telemetry.TsData; import org.thingsboard.server.service.telemetry.exception.InvalidParametersException; import org.thingsboard.server.service.telemetry.exception.UncheckedApiException; @@ -249,6 +249,60 @@ public class TelemetryController extends BaseController { return saveTelemetry(entityId, requestBody, ttl); } + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/delete", method = RequestMethod.DELETE) + @ResponseBody + public DeferredResult deleteEntityTimeseries(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, + @RequestParam(name = "keys") String keysStr, + @RequestParam(name = "deleteAllDataForKeys", defaultValue = "false") boolean deleteAllDataForKeys, + @RequestParam(name = "startTs", required = false) Long startTs, + @RequestParam(name = "endTs", required = false) Long endTs, + @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException { + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); + return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted); + } + + private DeferredResult deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys, + Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException { + List keys = toKeysList(keysStr); + if (keys.isEmpty()) { + return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST); + } + SecurityUser user = getCurrentUser(); + + long deleteFromTs; + long deleteToTs; + if (deleteAllDataForKeys) { + deleteFromTs = 0L; + deleteToTs = System.currentTimeMillis(); + } else { + deleteFromTs = startTs; + deleteToTs = endTs; + } + + return accessValidator.validateEntityAndCallback(user, entityIdStr, (result, entityId) -> { + List deleteTsKvQueries = new ArrayList<>(); + for (String key : keys) { + deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); + } + + ListenableFuture> future = tsService.remove(entityId, deleteTsKvQueries); + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List tmp) { + logTimeseriesDeleted(user, entityId, keys, null); + result.setResult(new ResponseEntity<>(HttpStatus.OK)); + } + + @Override + public void onFailure(Throwable t) { + logTimeseriesDeleted(user, entityId, keys, t); + result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }, executor); + }); + } + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/{deviceId}/{scope}", method = RequestMethod.DELETE) @ResponseBody @@ -506,6 +560,15 @@ public class TelemetryController extends BaseController { }; } + private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List keys, Throwable e) { + try { + logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e), + keys); + } catch (ThingsboardException te) { + log.warn("Failed to log timeseries delete", te); + } + } + private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List keys, Throwable e) { try { logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java index c37d4607c3..822387cfcb 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java @@ -24,6 +24,7 @@ public enum ActionType { UPDATED(false), // log entity ATTRIBUTES_UPDATED(false), // log attributes/values ATTRIBUTES_DELETED(false), // log attributes + TIMESERIES_DELETED(false), // log timeseries RPC_CALL(false), // log method and params CREDENTIALS_UPDATED(false), // log new credentials ASSIGNED_TO_CUSTOMER(false), // log customer name @@ -32,11 +33,11 @@ public enum ActionType { SUSPENDED(false), // log string id CREDENTIALS_READ(true), // log device id ATTRIBUTES_READ(true), // log attributes - RELATION_ADD_OR_UPDATE (false), - RELATION_DELETED (false), - RELATIONS_DELETED (false), - ALARM_ACK (false), - ALARM_CLEAR (false); + RELATION_ADD_OR_UPDATE(false), + RELATION_DELETED(false), + RELATIONS_DELETED(false), + ALARM_ACK(false), + ALARM_CLEAR(false); private final boolean isRead; diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java index da87b44322..9c828663c1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java @@ -43,6 +43,8 @@ public interface EntityViewService { EntityView findEntityViewById(EntityViewId entityViewId); + EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name); + TextPageData findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink); TextPageData findEntityViewByTenantIdAndType(TenantId tenantId, TextPageLink pageLink, String type); diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java index 2d94cc2eab..9f8949dc4e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/EntityViewServiceImpl.java @@ -29,8 +29,6 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Caching; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.Customer; -import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; @@ -40,12 +38,10 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; -import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.customer.CustomerDao; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.exception.DataValidationException; @@ -56,15 +52,13 @@ import org.thingsboard.server.dao.tenant.TenantDao; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Optional; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.CacheConstants.ENTITY_VIEW_CACHE; -import static org.thingsboard.server.common.data.CacheConstants.RELATIONS_CACHE; import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; import static org.thingsboard.server.dao.service.Validator.validateId; import static org.thingsboard.server.dao.service.Validator.validatePageLink; @@ -96,6 +90,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti @Caching(evict = { @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.entityId}"), + @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.tenantId, #entityView.name}"), @CacheEvict(cacheNames = ENTITY_VIEW_CACHE, key = "{#entityView.id}")}) @Override public EntityView saveEntityView(EntityView entityView) { @@ -137,6 +132,15 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti return entityViewDao.findById(entityViewId.getId()); } + @Cacheable(cacheNames = ENTITY_VIEW_CACHE, key = "{#tenantId, #name}") + @Override + public EntityView findEntityViewByTenantIdAndName(TenantId tenantId, String name) { + log.trace("Executing findEntityViewByTenantIdAndName [{}][{}]", tenantId, name); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + Optional entityViewOpt = entityViewDao.findEntityViewByTenantIdAndName(tenantId.getId(), name); + return entityViewOpt.orElse(null); + } + @Override public TextPageData findEntityViewByTenantId(TenantId tenantId, TextPageLink pageLink) { log.trace("Executing findEntityViewsByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); @@ -255,6 +259,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti deleteEntityRelations(entityViewId); EntityView entityView = entityViewDao.findById(entityViewId.getId()); cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getEntityId())); + cacheManager.getCache(ENTITY_VIEW_CACHE).evict(Arrays.asList(entityView.getTenantId(), entityView.getName())); entityViewDao.removeById(entityViewId.getId()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index 5bd9175628..04227a3825 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.timeseries; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -31,6 +32,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.UUIDConverter; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -41,9 +43,9 @@ import org.thingsboard.server.dao.model.sql.TsKvEntity; import org.thingsboard.server.dao.model.sql.TsKvLatestCompositeKey; import org.thingsboard.server.dao.model.sql.TsKvLatestEntity; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; +import org.thingsboard.server.dao.timeseries.SimpleListenableFuture; import org.thingsboard.server.dao.timeseries.TimeseriesDao; import org.thingsboard.server.dao.timeseries.TsInsertExecutorType; -import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlTsDao; import javax.annotation.Nullable; @@ -53,6 +55,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -64,6 +67,8 @@ import static org.thingsboard.server.common.data.UUIDConverter.fromTimeUUID; @SqlTsDao public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService implements TimeseriesDao { + private static final String DESC_ORDER = "DESC"; + @Value("${sql.ts_inserts_executor_type}") private String insertExecutorType; @@ -326,14 +331,72 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp @Override public ListenableFuture removeLatest(EntityId entityId, DeleteTsKvQuery query) { - TsKvLatestEntity latestEntity = new TsKvLatestEntity(); - latestEntity.setEntityType(entityId.getEntityType()); - latestEntity.setEntityId(fromTimeUUID(entityId.getId())); - latestEntity.setKey(query.getKey()); - return service.submit(() -> { - tsKvLatestRepository.delete(latestEntity); - return null; + ListenableFuture latestFuture = findLatest(entityId, query.getKey()); + + ListenableFuture booleanFuture = Futures.transform(latestFuture, tsKvEntry -> { + long ts = tsKvEntry.getTs(); + return ts > query.getStartTs() && ts <= query.getEndTs(); + }, service); + + ListenableFuture removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + if (isRemove) { + TsKvLatestEntity latestEntity = new TsKvLatestEntity(); + latestEntity.setEntityType(entityId.getEntityType()); + latestEntity.setEntityId(fromTimeUUID(entityId.getId())); + latestEntity.setKey(query.getKey()); + return service.submit(() -> { + tsKvLatestRepository.delete(latestEntity); + return null; + }); + } + return Futures.immediateFuture(null); + }, service); + + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + Futures.addCallback(removedLatestFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + if (query.getRewriteLatestIfDeleted()) { + ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + if (isRemove) { + return getNewLatestEntryFuture(entityId, query); + } + return Futures.immediateFuture(null); + }, service); + + try { + resultFuture.set(savedLatestFuture.get()); + } catch (InterruptedException | ExecutionException e) { + log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e); + } + } else { + resultFuture.set(null); + } + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to process remove of the latest value", entityId, t); + } }); + return resultFuture; + } + + private ListenableFuture getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) { + long startTs = 0; + long endTs = query.getStartTs() - 1; + ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, + Aggregation.NONE, DESC_ORDER); + ListenableFuture> future = findAllAsync(entityId, findNewLatestQuery); + + return Futures.transformAsync(future, entryList -> { + if (entryList.size() == 1) { + return saveLatest(entityId, entryList.get(0)); + } else { + log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); + } + return Futures.immediateFuture(null); + }, service); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java index 4c743e5ab6..296d173cf7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java @@ -47,7 +47,7 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") + "AND tskv.ts > :startTs AND tskv.ts <= :endTs") void delete(@Param("entityId") String entityId, @Param("entityType") EntityType entityType, @Param("entityKey") String key, diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 709bfd5432..fdc69f9262 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -48,7 +48,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.nosql.CassandraAbstractAsyncDao; -import org.thingsboard.server.dao.util.NoSqlDao; import org.thingsboard.server.dao.util.NoSqlTsDao; import javax.annotation.Nullable; @@ -62,6 +61,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; @@ -434,14 +434,14 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public ListenableFuture removeLatest(EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestEntryFuture = findLatest(entityId, query.getKey()); - ListenableFuture booleanFuture = Futures.transformAsync(latestEntryFuture, latestEntry -> { + ListenableFuture booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> { long ts = latestEntry.getTs(); - if (ts >= query.getStartTs() && ts <= query.getEndTs()) { - return Futures.immediateFuture(true); + if (ts > query.getStartTs() && ts <= query.getEndTs()) { + return true; } else { log.trace("Won't be deleted latest value for [{}], key - {}", entityId, query.getKey()); } - return Futures.immediateFuture(false); + return false; }, readResultsProcessingExecutor); ListenableFuture removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { @@ -451,18 +451,34 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem return Futures.immediateFuture(null); }, readResultsProcessingExecutor); - if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { - if (isRemove) { - return getNewLatestEntryFuture(entityId, query); - } - return Futures.immediateFuture(null); - }, readResultsProcessingExecutor); + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + Futures.addCallback(removedLatestFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void result) { + if (query.getRewriteLatestIfDeleted()) { + ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + if (isRemove) { + return getNewLatestEntryFuture(entityId, query); + } + return Futures.immediateFuture(null); + }, readResultsProcessingExecutor); - return Futures.transformAsync(Futures.allAsList(Arrays.asList(savedLatestFuture, removedLatestFuture)), - list -> Futures.immediateFuture(null), readResultsProcessingExecutor); - } - return removedLatestFuture; + try { + resultFuture.set(savedLatestFuture.get()); + } catch (InterruptedException | ExecutionException e) { + log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e); + } + } else { + resultFuture.set(null); + } + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to process remove of the latest value", entityId, t); + } + }); + return resultFuture; } private ListenableFuture getNewLatestEntryFuture(EntityId entityId, DeleteTsKvQuery query) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index 88f4d84eb6..81de40ac82 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -152,7 +152,7 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { } @Test - public void testDeleteDeviceTsData() throws Exception { + public void testDeleteDeviceTsDataWithoutOverwritingLatest() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased()); saveEntries(deviceId, 10000); @@ -171,6 +171,26 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Assert.assertEquals(null, latest.get(0).getValueAsString()); } + @Test + public void testDeleteDeviceTsDataWithOverwritingLatest() throws Exception { + DeviceId deviceId = new DeviceId(UUIDs.timeBased()); + + saveEntries(deviceId, 10000); + saveEntries(deviceId, 20000); + saveEntries(deviceId, 30000); + saveEntries(deviceId, 40000); + + tsService.remove(deviceId, Collections.singletonList( + new BaseDeleteTsKvQuery(STRING_KEY, 25000, 45000, true))).get(); + + List list = tsService.findAll(deviceId, Collections.singletonList( + new BaseReadTsKvQuery(STRING_KEY, 5000, 45000, 10000, 10, Aggregation.NONE))).get(); + Assert.assertEquals(2, list.size()); + + List latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get(); + Assert.assertEquals(20000, latest.get(0).getTs()); + } + @Test public void testFindDeviceTsData() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased());