diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 856ca8e399..79557f6047 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -16,7 +16,6 @@ package org.thingsboard.server.controller; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.*; @@ -29,23 +28,17 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.BaseTsKvQuery; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.exception.ThingsboardErrorCode; import org.thingsboard.server.exception.ThingsboardException; import org.thingsboard.server.service.security.model.SecurityUser; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -53,9 +46,6 @@ import java.util.stream.Collectors; @RequestMapping("/api") public class DeviceController extends BaseController { - @Autowired - protected TimeseriesService timeseriesService; - public static final String DEVICE_ID = "deviceId"; @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @@ -377,50 +367,4 @@ public class DeviceController extends BaseController { throw handleException(e); } } - - @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") - @RequestMapping(value = "/device/testSave", method = RequestMethod.GET) - @ResponseBody - public void testSave() throws ThingsboardException { - try { - SecurityUser user = getCurrentUser(); - TenantId tenantId = user.getTenantId(); - - Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test"); - - timeseriesService.save(device.getId(), new BasicTsKvEntry(1516892633000L, - new LongDataEntry("test", 1L))).get(); - timeseriesService.save(device.getId(), new BasicTsKvEntry(1519571033000L, - new LongDataEntry("test", 2L))).get(); - timeseriesService.save(device.getId(), new BasicTsKvEntry(1521990233000L, - new LongDataEntry("test", 3L))).get(); - timeseriesService.save(device.getId(), new BasicTsKvEntry(1524668633000L, - new LongDataEntry("test", 4L))).get(); - timeseriesService.save(device.getId(), new BasicTsKvEntry(1527260633000L, - new LongDataEntry("test", 5L))).get(); - - } catch (Exception e) { - throw handleException(e); - } - } - - @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") - @RequestMapping(value = "/device/testDelete", method = RequestMethod.GET) - @ResponseBody - public void testDelete() throws ThingsboardException { - try { - SecurityUser user = getCurrentUser(); - TenantId tenantId = user.getTenantId(); - - Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test"); - - long startTs = 1519561033000L; - long endTs = 1528260633000L; - timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test", - startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get(); - - } catch (Exception e) { - throw handleException(e); - } - } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java index bef82c3677..7915e844e2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/JpaTimeseriesDao.java @@ -300,14 +300,27 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp @Override public ListenableFuture remove(EntityId entityId, TsKvQuery query) { - //TODO: implement - return null; + return insertService.submit(() -> { + tsKvRepository.delete( + fromTimeUUID(entityId.getId()), + entityId.getEntityType(), + query.getKey(), + query.getStartTs(), + query.getEndTs()); + return null; + }); } @Override public ListenableFuture removeLatest(EntityId entityId, TsKvQuery query) { - //TODO: implement - return null; + TsKvLatestEntity latestEntity = new TsKvLatestEntity(); + latestEntity.setEntityType(entityId.getEntityType()); + latestEntity.setEntityId(fromTimeUUID(entityId.getId())); + latestEntity.setKey(query.getKey()); + return insertService.submit(() -> { + tsKvLatestRepository.delete(latestEntity); + return null; + }); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java index a1d19208b5..2b39d2596e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/timeseries/TsKvRepository.java @@ -16,10 +16,12 @@ package org.thingsboard.server.dao.sql.timeseries; import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.query.Param; import org.springframework.scheduling.annotation.Async; +import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.dao.model.sql.TsKvCompositeKey; import org.thingsboard.server.dao.model.sql.TsKvEntity; @@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") + void delete(@Param("entityId") String entityId, + @Param("entityType") EntityType entityType, + @Param("entityKey") String key, + @Param("startTs") long startTs, + @Param("endTs") long endTs); + @Async @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + @@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") CompletableFuture findMin(@Param("entityId") String entityId, - @Param("entityType") EntityType entityType, - @Param("entityKey") String entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); @Async @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") CompletableFuture findCount(@Param("entityId") String entityId, - @Param("entityType") EntityType entityType, - @Param("entityKey") String entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); @Async @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") CompletableFuture findAvg(@Param("entityId") String entityId, - @Param("entityType") EntityType entityType, - @Param("entityKey") String entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); @Async @@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository :startTs AND tskv.ts < :endTs") CompletableFuture findSum(@Param("entityId") String entityId, - @Param("entityType") EntityType entityType, - @Param("entityKey") String entityKey, - @Param("startTs") long startTs, - @Param("endTs") long endTs); + @Param("entityType") EntityType entityType, + @Param("entityKey") String entityKey, + @Param("startTs") long startTs, + @Param("endTs") long endTs); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 3e4e2bdac5..db61088b39 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -441,7 +441,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem } private ListenableFuture deleteLatest(EntityId entityId, String key) { - Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF) + Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF) .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType())) .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId())) .and(eq(ModelConstants.KEY_COLUMN, key)); @@ -453,25 +453,36 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public ListenableFuture removePartition(EntityId entityId, TsKvQuery query) { long minPartition = toPartitionTs(query.getStartTs()); long maxPartition = toPartitionTs(query.getEndTs()); + if (minPartition == maxPartition) { + return Futures.immediateFuture(null); + } else { + ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); - ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + final ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); - final ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); + Futures.addCallback(partitionsListFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable List partitions) { + int index = 0; + if (minPartition != query.getStartTs()) { + index = 1; + } + List partitionsToDelete = new ArrayList<>(); + for (int i = index; i < partitions.size() - 1; i++) { + partitionsToDelete.add(partitions.get(i)); + } + TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete); + deletePartitionAsync(cursor, resultFuture); + } - Futures.addCallback(partitionsListFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable List partitions) { - TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions); - deletePartitionAsync(cursor, resultFuture); - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); - } - }, readResultsProcessingExecutor); - return resultFuture; + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t); + } + }, readResultsProcessingExecutor); + return resultFuture; + } } private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture resultFuture) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java index a8d022ba2f..b3a742cda9 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/timeseries/BaseTimeseriesServiceTest.java @@ -93,24 +93,27 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest { Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); } - //TODO: sql delete implement - /*@Test + @Test public void testDeleteDeviceTsData() throws Exception { DeviceId deviceId = new DeviceId(UUIDs.timeBased()); + saveEntries(deviceId, TS - 4); saveEntries(deviceId, TS - 3); saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 1); - saveEntries(deviceId, TS); tsService.remove(deviceId, Collections.singletonList( - new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get(); + new BaseTsKvQuery(STRING_KEY, TS - 4, TS, 60000, 0, Aggregation.NONE, DESC_ORDER, + false))).get(); List list = tsService.findAll(deviceId, Collections.singletonList( - new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get(); + new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER, + false))).get(); + Assert.assertEquals(1, list.size()); - Assert.assertEquals(2, list.size()); - }*/ + List latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get(); + Assert.assertEquals(null, latest.get(0).getValueAsString()); + } @Test public void testFindDeviceTsData() throws Exception {