Clean up cassandra partitions cache on remove
This commit is contained in:
		
							parent
							
								
									cfcfc3d7b5
								
							
						
					
					
						commit
						3881c809fe
					
				@ -254,6 +254,13 @@ public class CassandraBaseTimeseriesDao extends AbstractCassandraBaseTimeseriesD
 | 
			
		||||
                    }
 | 
			
		||||
                    QueryCursor cursor = new QueryCursor(entityId.getEntityType().name(), entityId.getId(), query, partitionsToDelete);
 | 
			
		||||
                    deletePartitionAsync(tenantId, cursor, resultFuture);
 | 
			
		||||
 | 
			
		||||
                    for (Long partition : partitionsToDelete) {
 | 
			
		||||
                        CassandraPartitionCacheKey key = new CassandraPartitionCacheKey(entityId, query.getKey(), partition);
 | 
			
		||||
                        if (cassandraTsPartitionsCache.has(key)) {
 | 
			
		||||
                            cassandraTsPartitionsCache.invalidate(key);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                @Override
 | 
			
		||||
 | 
			
		||||
@ -39,4 +39,8 @@ public class CassandraTsPartitionsCache {
 | 
			
		||||
    public void put(CassandraPartitionCacheKey key) {
 | 
			
		||||
        partitionsCache.put(key, CompletableFuture.completedFuture(true));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void invalidate(CassandraPartitionCacheKey key) {
 | 
			
		||||
        partitionsCache.synchronous().invalidate(key);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -606,6 +606,31 @@ public abstract class BaseTimeseriesServiceTest extends AbstractServiceTest {
 | 
			
		||||
        assertEquals(java.util.Optional.of(2L), list.get(2).getLongValue());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSaveTs_RemoveTs_AndSaveTsAgain() throws Exception {
 | 
			
		||||
        DeviceId deviceId = new DeviceId(Uuids.timeBased());
 | 
			
		||||
 | 
			
		||||
        save(deviceId, 2000000L, 95);
 | 
			
		||||
        save(deviceId, 4000000L, 100);
 | 
			
		||||
        save(deviceId, 6000000L, 105);
 | 
			
		||||
        List<TsKvEntry> list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0L,
 | 
			
		||||
                8000000L, 200000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
        assertEquals(3, list.size());
 | 
			
		||||
 | 
			
		||||
        tsService.remove(tenantId, deviceId, Collections.singletonList(
 | 
			
		||||
                new BaseDeleteTsKvQuery(LONG_KEY, 0L, 8000000L, false))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
        list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0L,
 | 
			
		||||
                8000000L, 200000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
        assertEquals(0, list.size());
 | 
			
		||||
 | 
			
		||||
        save(deviceId, 2000000L, 99);
 | 
			
		||||
        save(deviceId, 4000000L, 104);
 | 
			
		||||
        save(deviceId, 6000000L, 109);
 | 
			
		||||
        list = tsService.findAll(tenantId, deviceId, Collections.singletonList(new BaseReadTsKvQuery(LONG_KEY, 0L,
 | 
			
		||||
                8000000L, 200000, 3, Aggregation.NONE))).get(MAX_TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
        assertEquals(3, list.size());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private TsKvEntry save(DeviceId deviceId, long ts, long value) throws Exception {
 | 
			
		||||
        TsKvEntry entry = new BasicTsKvEntry(ts, new LongDataEntry(LONG_KEY, value));
 | 
			
		||||
        tsService.save(tenantId, deviceId, entry).get(MAX_TIMEOUT, TimeUnit.SECONDS);
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user