added deleteLatest api

This commit is contained in:
YevhenBondarenko 2023-07-12 19:48:25 +02:00
parent 7186f30c60
commit 4b7cc4571d
9 changed files with 257 additions and 14 deletions

View File

@ -201,7 +201,7 @@ public class TelemetryController extends BaseController {
@ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr, @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr,
@ApiParam(value = ATTRIBUTES_SCOPE_DESCRIPTION, required = true, allowableValues = ATTRIBUTES_SCOPE_ALLOWED_VALUES) @PathVariable("scope") String scope) throws ThingsboardException { @ApiParam(value = ATTRIBUTES_SCOPE_DESCRIPTION, required = true, allowableValues = ATTRIBUTES_SCOPE_ALLOWED_VALUES) @PathVariable("scope") String scope) throws ThingsboardException {
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr, return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr,
(result, tenantId, entityId) -> getAttributeKeysCallback(result, tenantId, entityId, scope)); (result, tenantId, entityId) -> getAttributeKeysCallback(result, tenantId, entityId, scope));
} }
@ApiOperation(value = "Get attributes (getAttributes)", @ApiOperation(value = "Get attributes (getAttributes)",
@ -219,9 +219,9 @@ public class TelemetryController extends BaseController {
@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") @PathVariable("entityType") String entityType, @ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") @PathVariable("entityType") String entityType,
@ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr, @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr,
@ApiParam(value = ATTRIBUTES_KEYS_DESCRIPTION) @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { @ApiParam(value = ATTRIBUTES_KEYS_DESCRIPTION) @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
SecurityUser user = getCurrentUser(); SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr, return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr,
(result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr)); (result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr));
} }
@ -245,7 +245,7 @@ public class TelemetryController extends BaseController {
@ApiParam(value = ATTRIBUTES_KEYS_DESCRIPTION) @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { @ApiParam(value = ATTRIBUTES_KEYS_DESCRIPTION) @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException {
SecurityUser user = getCurrentUser(); SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr, return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr,
(result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr)); (result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr));
} }
@ApiOperation(value = "Get time-series keys (getTimeseriesKeys)", @ApiOperation(value = "Get time-series keys (getTimeseriesKeys)",
@ -259,7 +259,7 @@ public class TelemetryController extends BaseController {
@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") @PathVariable("entityType") String entityType, @ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") @PathVariable("entityType") String entityType,
@ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr) throws ThingsboardException { @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
(result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result), MoreExecutors.directExecutor())); (result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result), MoreExecutors.directExecutor()));
} }
@ApiOperation(value = "Get latest time-series value (getLatestTimeseries)", @ApiOperation(value = "Get latest time-series value (getLatestTimeseries)",
@ -487,13 +487,15 @@ public class TelemetryController extends BaseController {
@ApiParam(value = "A long value representing the end timestamp of removal time range in milliseconds.") @ApiParam(value = "A long value representing the end timestamp of removal time range in milliseconds.")
@RequestParam(name = "endTs", required = false) Long endTs, @RequestParam(name = "endTs", required = false) Long endTs,
@ApiParam(value = "If the parameter is set to true, the latest telemetry will be rewritten in case that current latest value was removed, otherwise, in case that parameter is set to false the new latest value will not set.") @ApiParam(value = "If the parameter is set to true, the latest telemetry will be rewritten in case that current latest value was removed, otherwise, in case that parameter is set to false the new latest value will not set.")
@RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException { @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted,
@ApiParam(value = "If the parameter is set to true, the latest telemetry can be removed, otherwise, in case that parameter is set to false the latest value will not removed.")
@RequestParam(name = "deleteLatest", required = false, defaultValue = "true") boolean deleteLatest) throws ThingsboardException {
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted); return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted, deleteLatest);
} }
private DeferredResult<ResponseEntity> deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys, private DeferredResult<ResponseEntity> deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys,
Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException { Long startTs, Long endTs, boolean rewriteLatestIfDeleted, boolean deleteLatest) throws ThingsboardException {
List<String> keys = toKeysList(keysStr); List<String> keys = toKeysList(keysStr);
if (keys.isEmpty()) { if (keys.isEmpty()) {
return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST); return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
@ -517,7 +519,7 @@ public class TelemetryController extends BaseController {
return accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entityIdStr, (result, tenantId, entityId) -> { return accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entityIdStr, (result, tenantId, entityId) -> {
List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>(); List<DeleteTsKvQuery> deleteTsKvQueries = new ArrayList<>();
for (String key : keys) { for (String key : keys) {
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted, deleteLatest));
} }
tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() {
@Override @Override
@ -535,6 +537,55 @@ public class TelemetryController extends BaseController {
}); });
} }
@ApiOperation(value = "Delete entity latest time-series data (deleteEntityLatestTimeseries)",
notes = "Delete latest time-series for selected entity based on entity id, entity type and keys. " +
TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH,
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Timeseries for the selected keys in the request was removed. " +
"Platform creates an audit log event about entity latest timeseries removal with action type 'TIMESERIES_DELETED'."),
@ApiResponse(code = 400, message = "Platform returns a bad request in case if keys list is empty."),
@ApiResponse(code = 401, message = "User is not authorized to delete entity latest timeseries for selected entity. Most likely, User belongs to different Customer or Tenant."),
@ApiResponse(code = 500, message = "The exception was thrown during processing the request. " +
"Platform creates an audit log event about entity latest timeseries removal with action type 'TIMESERIES_DELETED' that includes an error stacktrace."),
})
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/{entityType}/{entityId}/timeseries/latest/delete", method = RequestMethod.DELETE)
@ResponseBody
public DeferredResult<ResponseEntity> deleteEntityLatestTimeseries(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE")
@PathVariable("entityType") String entityType,
@ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable("entityId") String entityIdStr,
@ApiParam(value = TELEMETRY_KEYS_DESCRIPTION, required = true)
@RequestParam(name = "keys") String keysStr) throws ThingsboardException {
EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr);
return deleteLatestTimeseries(entityId, keysStr);
}
private DeferredResult<ResponseEntity> deleteLatestTimeseries(EntityId entityIdStr, String keysStr) throws ThingsboardException {
List<String> keys = toKeysList(keysStr);
if (keys.isEmpty()) {
return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST);
}
SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entityIdStr, (result, tenantId, entityId) ->
tsSubService.deleteLatestAndNotify(tenantId, entityId, keys, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
logLatestTimeseriesDeleted(user, entityId, keys, null);
result.setResult(new ResponseEntity<>(HttpStatus.OK));
}
@Override
public void onFailure(Throwable t) {
logLatestTimeseriesDeleted(user, entityId, keys, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
})
);
}
@ApiOperation(value = "Delete device attributes (deleteDeviceAttributes)", @ApiOperation(value = "Delete device attributes (deleteDeviceAttributes)",
notes = "Delete device attributes using provided Device Id, scope and a list of keys. " + notes = "Delete device attributes using provided Device Id, scope and a list of keys. " +
"Referencing a non-existing Device Id will cause an error" + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH, "Referencing a non-existing Device Id will cause an error" + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH,
@ -827,6 +878,11 @@ public class TelemetryController extends BaseController {
toException(e), keys, startTs, endTs); toException(e), keys, startTs, endTs);
} }
private void logLatestTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) {
notificationEntityService.logEntityAction(user.getTenantId(), entityId, ActionType.TIMESERIES_DELETED, user,
toException(e), keys);
}
private void logTelemetryUpdated(SecurityUser user, EntityId entityId, List<TsKvEntry> telemetry, Throwable e) { private void logTelemetryUpdated(SecurityUser user, EntityId entityId, List<TsKvEntry> telemetry, Throwable e) {
notificationEntityService.logEntityAction(user.getTenantId(), entityId, ActionType.TIMESERIES_UPDATED, user, notificationEntityService.logEntityAction(user.getTenantId(), entityId, ActionType.TIMESERIES_UPDATED, user,
toException(e), telemetry); toException(e), telemetry);

View File

@ -316,6 +316,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
} }
@Override
public void deleteLatestAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
}
@Override @Override
public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) { public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)

View File

@ -792,6 +792,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass); return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass);
} }
protected <T> T doDeleteAsync(String urlTemplate, Class<T> responseClass, String... params) throws Exception {
return readResponse(doDeleteAsync(urlTemplate, DEFAULT_TIMEOUT, params).andExpect(status().isOk()), responseClass);
}
protected ResultActions doPost(String urlTemplate, String... params) throws Exception { protected ResultActions doPost(String urlTemplate, String... params) throws Exception {
MockHttpServletRequestBuilder postRequest = post(urlTemplate); MockHttpServletRequestBuilder postRequest = post(urlTemplate);
setJwtToken(postRequest); setJwtToken(postRequest);
@ -824,6 +828,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return mockMvc.perform(deleteRequest); return mockMvc.perform(deleteRequest);
} }
protected ResultActions doDeleteAsync(String urlTemplate, Long timeout, String... params) throws Exception {
MockHttpServletRequestBuilder deleteRequest = delete(urlTemplate, params);
setJwtToken(deleteRequest);
// populateParams(deleteRequest, params);
MvcResult result = mockMvc.perform(deleteRequest).andReturn();
result.getAsyncResult(timeout);
return mockMvc.perform(asyncDispatch(result));
}
protected void populateParams(MockHttpServletRequestBuilder request, String... params) { protected void populateParams(MockHttpServletRequestBuilder request, String... params) {
if (params != null && params.length > 0) { if (params != null && params.length > 0) {
Assert.assertEquals(0, params.length % 2); Assert.assertEquals(0, params.length % 2);

View File

@ -15,14 +15,21 @@
*/ */
package org.thingsboard.server.controller; package org.thingsboard.server.controller;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.SingleEntityFilter;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.List;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.query.EntityKeyType.TIME_SERIES;
@DaoSqlTest @DaoSqlTest
public class TelemetryControllerTest extends AbstractControllerTest { public class TelemetryControllerTest extends AbstractControllerTest {
@ -39,6 +46,136 @@ public class TelemetryControllerTest extends AbstractControllerTest {
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", invalidRequestBody, String.class, status().isBadRequest()); doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", invalidRequestBody, String.class, status().isBadRequest());
} }
@Test
public void testDeleteLatest() throws Exception {
loginTenantAdmin();
Device device = createDevice();
SingleEntityFilter filter = new SingleEntityFilter();
filter.setSingleEntity(device.getId());
getWsClient().subscribeLatestUpdate(List.of(new EntityKey(TIME_SERIES, "data")), filter);
getWsClient().registerWaitForUpdate(1);
long startTs = System.currentTimeMillis();
String testBody = "{\"data\": \"value\"}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", testBody, String.class, status().isOk());
long endTs = System.currentTimeMillis();
ObjectNode latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class);
Assert.assertNotNull(latest);
var data = latest.get("data");
Assert.assertNotNull(data);
Assert.assertEquals("value", data.get(0).get("value").asText());
ObjectNode timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs);
Assert.assertNotNull(timeseries);
Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText());
doDeleteAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/latest/delete?keys=data", String.class);
latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class);
Assert.assertTrue(latest.get("data").get(0).get("value").isNull());
timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs);
Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText());
}
@Test
public void testDeleteAllTelemetryWithLatest() throws Exception {
loginTenantAdmin();
Device device = createDevice();
SingleEntityFilter filter = new SingleEntityFilter();
filter.setSingleEntity(device.getId());
getWsClient().subscribeLatestUpdate(List.of(new EntityKey(TIME_SERIES, "data")), filter);
getWsClient().registerWaitForUpdate(1);
long startTs = System.currentTimeMillis();
String testBody = "{\"data\": \"value\"}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", testBody, String.class, status().isOk());
long endTs = System.currentTimeMillis();
ObjectNode latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class);
Assert.assertNotNull(latest);
var data = latest.get("data");
Assert.assertNotNull(data);
Assert.assertEquals("value", data.get(0).get("value").asText());
ObjectNode timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs);
Assert.assertNotNull(timeseries);
Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText());
doDeleteAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/delete?keys=data&deleteAllDataForKeys=true", String.class);
latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class);
Assert.assertTrue(latest.get("data").get(0).get("value").isNull());
timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs);
Assert.assertTrue(timeseries.isEmpty());
}
@Test
public void testDeleteAllTelemetryWithoutLatest() throws Exception {
loginTenantAdmin();
Device device = createDevice();
SingleEntityFilter filter = new SingleEntityFilter();
filter.setSingleEntity(device.getId());
getWsClient().subscribeLatestUpdate(List.of(new EntityKey(TIME_SERIES, "data")), filter);
getWsClient().registerWaitForUpdate(1);
long startTs = System.currentTimeMillis();
String testBody = "{\"data\": \"value\"}";
doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", testBody, String.class, status().isOk());
long endTs = System.currentTimeMillis();
ObjectNode latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class);
Assert.assertNotNull(latest);
Assert.assertEquals("value", latest.get("data").get(0).get("value").asText());
ObjectNode timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs);
Assert.assertNotNull(timeseries);
Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText());
doDeleteAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/delete?keys=data&deleteAllDataForKeys=true&deleteLatest=false", String.class);
latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class);
Assert.assertEquals("value", latest.get("data").get(0).get("value").asText());
timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs);
Assert.assertTrue(timeseries.isEmpty());
}
private Device createDevice() throws Exception { private Device createDevice() throws Exception {
String testToken = "TEST_TOKEN"; String testToken = "TEST_TOKEN";

View File

@ -21,14 +21,20 @@ import lombok.Data;
public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery { public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery {
private final Boolean rewriteLatestIfDeleted; private final Boolean rewriteLatestIfDeleted;
private final Boolean deleteLatest;
public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) { public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted, boolean deleteLatest) {
super(key, startTs, endTs); super(key, startTs, endTs);
this.rewriteLatestIfDeleted = rewriteLatestIfDeleted; this.rewriteLatestIfDeleted = rewriteLatestIfDeleted;
this.deleteLatest = deleteLatest;
}
public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) {
this(key, startTs, endTs, rewriteLatestIfDeleted, true);
} }
public BaseDeleteTsKvQuery(String key, long startTs, long endTs) { public BaseDeleteTsKvQuery(String key, long startTs, long endTs) {
this(key, startTs, endTs, false); this(key, startTs, endTs, false, true);
} }
} }

View File

@ -19,4 +19,6 @@ public interface DeleteTsKvQuery extends TsKvQuery {
Boolean getRewriteLatestIfDeleted(); Boolean getRewriteLatestIfDeleted();
Boolean getDeleteLatest();
} }

View File

@ -275,7 +275,9 @@ public class BaseTimeseriesService implements TimeseriesService {
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) { private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); if (query.getDeleteLatest()) {
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
}
} }
private static void validate(EntityId entityId) { private static void validate(EntityId entityId) {

View File

@ -2364,7 +2364,8 @@ public class RestClient implements Closeable {
boolean deleteAllDataForKeys, boolean deleteAllDataForKeys,
Long startTs, Long startTs,
Long endTs, Long endTs,
boolean rewriteLatestIfDeleted) { boolean rewriteLatestIfDeleted,
boolean deleteLatest) {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
params.put("entityType", entityId.getEntityType().name()); params.put("entityType", entityId.getEntityType().name());
params.put("entityId", entityId.getId().toString()); params.put("entityId", entityId.getId().toString());
@ -2373,17 +2374,34 @@ public class RestClient implements Closeable {
params.put("startTs", startTs.toString()); params.put("startTs", startTs.toString());
params.put("endTs", endTs.toString()); params.put("endTs", endTs.toString());
params.put("rewriteLatestIfDeleted", String.valueOf(rewriteLatestIfDeleted)); params.put("rewriteLatestIfDeleted", String.valueOf(rewriteLatestIfDeleted));
params.put("deleteLatest", String.valueOf(deleteLatest));
return restTemplate return restTemplate
.exchange( .exchange(
baseURL + "/api/plugins/telemetry/{entityType}/{entityId}/timeseries/delete?keys={keys}&deleteAllDataForKeys={deleteAllDataForKeys}&startTs={startTs}&endTs={endTs}&rewriteLatestIfDeleted={rewriteLatestIfDeleted}", baseURL + "/api/plugins/telemetry/{entityType}/{entityId}/timeseries/delete?keys={keys}&deleteAllDataForKeys={deleteAllDataForKeys}&startTs={startTs}&endTs={endTs}&rewriteLatestIfDeleted={rewriteLatestIfDeleted}&deleteLatest={deleteLatest}",
HttpMethod.DELETE, HttpMethod.DELETE,
HttpEntity.EMPTY, HttpEntity.EMPTY,
Object.class, Object.class,
params) params)
.getStatusCode() .getStatusCode()
.is2xxSuccessful(); .is2xxSuccessful();
}
public boolean deleteEntityLatestTimeseries(EntityId entityId, List<String> keys) {
Map<String, String> params = new HashMap<>();
params.put("entityType", entityId.getEntityType().name());
params.put("entityId", entityId.getId().toString());
params.put("keys", listToString(keys));
return restTemplate
.exchange(
baseURL + "/api/plugins/telemetry/{entityType}/{entityId}/timeseries/latest/delete?keys={keys}",
HttpMethod.DELETE,
HttpEntity.EMPTY,
Object.class,
params)
.getStatusCode()
.is2xxSuccessful();
} }
public boolean deleteEntityAttributes(DeviceId deviceId, String scope, List<String> keys) { public boolean deleteEntityAttributes(DeviceId deviceId, String scope, List<String> keys) {

View File

@ -71,4 +71,6 @@ public interface RuleEngineTelemetryService {
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback); void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback); void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback);
void deleteLatestAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
} }