jpa delete timeseries implementation, partitions delete fix

This commit is contained in:
Dima Landiak 2018-06-05 18:25:39 +03:00
parent e37e7242fd
commit ce9967488b
5 changed files with 84 additions and 100 deletions

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.controller; package org.thingsboard.server.controller;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
@ -29,23 +28,17 @@ import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.exception.ThingsboardErrorCode; import org.thingsboard.server.exception.ThingsboardErrorCode;
import org.thingsboard.server.exception.ThingsboardException; import org.thingsboard.server.exception.ThingsboardException;
import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -53,9 +46,6 @@ import java.util.stream.Collectors;
@RequestMapping("/api") @RequestMapping("/api")
public class DeviceController extends BaseController { public class DeviceController extends BaseController {
@Autowired
protected TimeseriesService timeseriesService;
public static final String DEVICE_ID = "deviceId"; public static final String DEVICE_ID = "deviceId";
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@ -377,50 +367,4 @@ public class DeviceController extends BaseController {
throw handleException(e); throw handleException(e);
} }
} }
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/device/testSave", method = RequestMethod.GET)
@ResponseBody
public void testSave() throws ThingsboardException {
try {
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
timeseriesService.save(device.getId(), new BasicTsKvEntry(1516892633000L,
new LongDataEntry("test", 1L))).get();
timeseriesService.save(device.getId(), new BasicTsKvEntry(1519571033000L,
new LongDataEntry("test", 2L))).get();
timeseriesService.save(device.getId(), new BasicTsKvEntry(1521990233000L,
new LongDataEntry("test", 3L))).get();
timeseriesService.save(device.getId(), new BasicTsKvEntry(1524668633000L,
new LongDataEntry("test", 4L))).get();
timeseriesService.save(device.getId(), new BasicTsKvEntry(1527260633000L,
new LongDataEntry("test", 5L))).get();
} catch (Exception e) {
throw handleException(e);
}
}
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/device/testDelete", method = RequestMethod.GET)
@ResponseBody
public void testDelete() throws ThingsboardException {
try {
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, "Test");
long startTs = 1519561033000L;
long endTs = 1528260633000L;
timeseriesService.remove(device.getId(), Collections.singletonList(new BaseTsKvQuery("test",
startTs, endTs, endTs - startTs, 0, Aggregation.NONE, "DESC", true))).get();
} catch (Exception e) {
throw handleException(e);
}
}
} }

View File

@ -300,14 +300,27 @@ public class JpaTimeseriesDao extends JpaAbstractDaoListeningExecutorService imp
@Override @Override
public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) { public ListenableFuture<Void> remove(EntityId entityId, TsKvQuery query) {
//TODO: implement return insertService.submit(() -> {
return null; tsKvRepository.delete(
fromTimeUUID(entityId.getId()),
entityId.getEntityType(),
query.getKey(),
query.getStartTs(),
query.getEndTs());
return null;
});
} }
@Override @Override
public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) { public ListenableFuture<Void> removeLatest(EntityId entityId, TsKvQuery query) {
//TODO: implement TsKvLatestEntity latestEntity = new TsKvLatestEntity();
return null; latestEntity.setEntityType(entityId.getEntityType());
latestEntity.setEntityId(fromTimeUUID(entityId.getId()));
latestEntity.setKey(query.getKey());
return insertService.submit(() -> {
tsKvLatestRepository.delete(latestEntity);
return null;
});
} }
@Override @Override

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.dao.sql.timeseries; package org.thingsboard.server.dao.sql.timeseries;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query; import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param; import org.springframework.data.repository.query.Param;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.model.sql.TsKvCompositeKey; import org.thingsboard.server.dao.model.sql.TsKvCompositeKey;
import org.thingsboard.server.dao.model.sql.TsKvEntity; import org.thingsboard.server.dao.model.sql.TsKvEntity;
@ -41,6 +43,17 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
@Param("endTs") long endTs, @Param("endTs") long endTs,
Pageable pageable); Pageable pageable);
@Transactional
@Modifying
@Query("DELETE FROM TsKvEntity tskv WHERE tskv.entityId = :entityId " +
"AND tskv.entityType = :entityType AND tskv.key = :entityKey " +
"AND tskv.ts > :startTs AND tskv.ts < :endTs")
void delete(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType,
@Param("entityKey") String key,
@Param("startTs") long startTs,
@Param("endTs") long endTs);
@Async @Async
@Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " + @Query("SELECT new TsKvEntity(MAX(tskv.strValue), MAX(tskv.longValue), MAX(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
@ -56,30 +69,30 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId, CompletableFuture<TsKvEntity> findMin(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType, @Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey, @Param("entityKey") String entityKey,
@Param("startTs") long startTs, @Param("startTs") long startTs,
@Param("endTs") long endTs); @Param("endTs") long endTs);
@Async @Async
@Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " + @Query("SELECT new TsKvEntity(COUNT(tskv.booleanValue), COUNT(tskv.strValue), COUNT(tskv.longValue), COUNT(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId, CompletableFuture<TsKvEntity> findCount(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType, @Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey, @Param("entityKey") String entityKey,
@Param("startTs") long startTs, @Param("startTs") long startTs,
@Param("endTs") long endTs); @Param("endTs") long endTs);
@Async @Async
@Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " + @Query("SELECT new TsKvEntity(AVG(tskv.longValue), AVG(tskv.doubleValue)) FROM TsKvEntity tskv " +
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId, CompletableFuture<TsKvEntity> findAvg(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType, @Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey, @Param("entityKey") String entityKey,
@Param("startTs") long startTs, @Param("startTs") long startTs,
@Param("endTs") long endTs); @Param("endTs") long endTs);
@Async @Async
@ -87,8 +100,8 @@ public interface TsKvRepository extends CrudRepository<TsKvEntity, TsKvComposite
"WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " + "WHERE tskv.entityId = :entityId AND tskv.entityType = :entityType " +
"AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs") "AND tskv.key = :entityKey AND tskv.ts > :startTs AND tskv.ts < :endTs")
CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId, CompletableFuture<TsKvEntity> findSum(@Param("entityId") String entityId,
@Param("entityType") EntityType entityType, @Param("entityType") EntityType entityType,
@Param("entityKey") String entityKey, @Param("entityKey") String entityKey,
@Param("startTs") long startTs, @Param("startTs") long startTs,
@Param("endTs") long endTs); @Param("endTs") long endTs);
} }

View File

@ -441,7 +441,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
} }
private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) { private ListenableFuture<Void> deleteLatest(EntityId entityId, String key) {
Statement delete = QueryBuilder.delete().from(ModelConstants.TS_KV_LATEST_CF) Statement delete = QueryBuilder.delete().all().from(ModelConstants.TS_KV_LATEST_CF)
.where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType())) .where(eq(ModelConstants.ENTITY_TYPE_COLUMN, entityId.getEntityType()))
.and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId())) .and(eq(ModelConstants.ENTITY_ID_COLUMN, entityId.getId()))
.and(eq(ModelConstants.KEY_COLUMN, key)); .and(eq(ModelConstants.KEY_COLUMN, key));
@ -453,25 +453,36 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) { public ListenableFuture<Void> removePartition(EntityId entityId, TsKvQuery query) {
long minPartition = toPartitionTs(query.getStartTs()); long minPartition = toPartitionTs(query.getStartTs());
long maxPartition = toPartitionTs(query.getEndTs()); long maxPartition = toPartitionTs(query.getEndTs());
if (minPartition == maxPartition) {
return Futures.immediateFuture(null);
} else {
ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition);
ResultSetFuture partitionsFuture = fetchPartitions(entityId, query.getKey(), minPartition, maxPartition); final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor);
final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>(); Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() {
final ListenableFuture<List<Long>> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); @Override
public void onSuccess(@Nullable List<Long> partitions) {
int index = 0;
if (minPartition != query.getStartTs()) {
index = 1;
}
List<Long> partitionsToDelete = new ArrayList<>();
for (int i = index; i < partitions.size() - 1; i++) {
partitionsToDelete.add(partitions.get(i));
}
TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
deletePartitionAsync(cursor, resultFuture);
}
Futures.addCallback(partitionsListFuture, new FutureCallback<List<Long>>() { @Override
@Override public void onFailure(Throwable t) {
public void onSuccess(@Nullable List<Long> partitions) { log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
TsKvQueryCursor cursor = new TsKvQueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitions); }
deletePartitionAsync(cursor, resultFuture); }, readResultsProcessingExecutor);
} return resultFuture;
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to fetch partitions for interval {}-{}", entityId.getEntityType().name(), entityId.getId(), minPartition, maxPartition, t);
}
}, readResultsProcessingExecutor);
return resultFuture;
} }
private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) { private void deletePartitionAsync(final TsKvQueryCursor cursor, final SimpleListenableFuture<Void> resultFuture) {

View File

@ -93,24 +93,27 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0)); Assert.assertEquals(toTsEntry(TS, stringKvEntry), entries.get(0));
} }
//TODO: sql delete implement @Test
/*@Test
public void testDeleteDeviceTsData() throws Exception { public void testDeleteDeviceTsData() throws Exception {
DeviceId deviceId = new DeviceId(UUIDs.timeBased()); DeviceId deviceId = new DeviceId(UUIDs.timeBased());
saveEntries(deviceId, TS - 4);
saveEntries(deviceId, TS - 3); saveEntries(deviceId, TS - 3);
saveEntries(deviceId, TS - 2); saveEntries(deviceId, TS - 2);
saveEntries(deviceId, TS - 1); saveEntries(deviceId, TS - 1);
saveEntries(deviceId, TS);
tsService.remove(deviceId, Collections.singletonList( tsService.remove(deviceId, Collections.singletonList(
new BaseTsKvQuery(STRING_KEY, TS - 4, TS - 2))).get(); new BaseTsKvQuery(STRING_KEY, TS - 4, TS, 60000, 0, Aggregation.NONE, DESC_ORDER,
false))).get();
List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList( List<TsKvEntry> list = tsService.findAll(deviceId, Collections.singletonList(
new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER))).get(); new BaseTsKvQuery(STRING_KEY, 0, 60000, 60000, 5, Aggregation.NONE, DESC_ORDER,
false))).get();
Assert.assertEquals(1, list.size());
Assert.assertEquals(2, list.size()); List<TsKvEntry> latest = tsService.findLatest(deviceId, Collections.singletonList(STRING_KEY)).get();
}*/ Assert.assertEquals(null, latest.get(0).getValueAsString());
}
@Test @Test
public void testFindDeviceTsData() throws Exception { public void testFindDeviceTsData() throws Exception {