From 4b7cc4571dbe91d27cb0428d09b90603ae70410c Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 12 Jul 2023 19:48:25 +0200 Subject: [PATCH] added deleteLatest api --- .../controller/TelemetryController.java | 74 ++++++++-- .../DefaultTelemetrySubscriptionService.java | 7 + .../server/controller/AbstractWebTest.java | 13 ++ .../controller/TelemetryControllerTest.java | 137 ++++++++++++++++++ .../common/data/kv/BaseDeleteTsKvQuery.java | 10 +- .../common/data/kv/DeleteTsKvQuery.java | 2 + .../dao/timeseries/BaseTimeseriesService.java | 4 +- .../thingsboard/rest/client/RestClient.java | 22 ++- .../api/RuleEngineTelemetryService.java | 2 + 9 files changed, 257 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 05f56e93bd..8d94fb22cf 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -201,7 +201,7 @@ public class TelemetryController extends BaseController { @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr, @ApiParam(value = ATTRIBUTES_SCOPE_DESCRIPTION, required = true, allowableValues = ATTRIBUTES_SCOPE_ALLOWED_VALUES) @PathVariable("scope") String scope) throws ThingsboardException { return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr, - (result, tenantId, entityId) -> getAttributeKeysCallback(result, tenantId, entityId, scope)); + (result, tenantId, entityId) -> getAttributeKeysCallback(result, tenantId, entityId, scope)); } @ApiOperation(value = "Get attributes (getAttributes)", @@ -219,9 +219,9 @@ public class TelemetryController extends BaseController { @ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") @PathVariable("entityType") String entityType, @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr, @ApiParam(value = ATTRIBUTES_KEYS_DESCRIPTION) @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { - SecurityUser user = getCurrentUser(); + SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr, - (result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr)); + (result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, null, keysStr)); } @@ -245,7 +245,7 @@ public class TelemetryController extends BaseController { @ApiParam(value = ATTRIBUTES_KEYS_DESCRIPTION) @RequestParam(name = "keys", required = false) String keysStr) throws ThingsboardException { SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_ATTRIBUTES, entityType, entityIdStr, - (result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr)); + (result, tenantId, entityId) -> getAttributeValuesCallback(result, user, entityId, scope, keysStr)); } @ApiOperation(value = "Get time-series keys (getTimeseriesKeys)", @@ -259,7 +259,7 @@ public class TelemetryController extends BaseController { @ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") @PathVariable("entityType") String entityType, @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr) throws ThingsboardException { return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, - (result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result), MoreExecutors.directExecutor())); + (result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result), MoreExecutors.directExecutor())); } @ApiOperation(value = "Get latest time-series value (getLatestTimeseries)", @@ -487,13 +487,15 @@ public class TelemetryController extends BaseController { @ApiParam(value = "A long value representing the end timestamp of removal time range in milliseconds.") @RequestParam(name = "endTs", required = false) Long endTs, @ApiParam(value = "If the parameter is set to true, the latest telemetry will be rewritten in case that current latest value was removed, otherwise, in case that parameter is set to false the new latest value will not set.") - @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted) throws ThingsboardException { + @RequestParam(name = "rewriteLatestIfDeleted", defaultValue = "false") boolean rewriteLatestIfDeleted, + @ApiParam(value = "If the parameter is set to true, the latest telemetry can be removed, otherwise, in case that parameter is set to false the latest value will not removed.") + @RequestParam(name = "deleteLatest", required = false, defaultValue = "true") boolean deleteLatest) throws ThingsboardException { EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); - return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted); + return deleteTimeseries(entityId, keysStr, deleteAllDataForKeys, startTs, endTs, rewriteLatestIfDeleted, deleteLatest); } private DeferredResult deleteTimeseries(EntityId entityIdStr, String keysStr, boolean deleteAllDataForKeys, - Long startTs, Long endTs, boolean rewriteLatestIfDeleted) throws ThingsboardException { + Long startTs, Long endTs, boolean rewriteLatestIfDeleted, boolean deleteLatest) throws ThingsboardException { List keys = toKeysList(keysStr); if (keys.isEmpty()) { return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST); @@ -517,7 +519,7 @@ public class TelemetryController extends BaseController { return accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entityIdStr, (result, tenantId, entityId) -> { List deleteTsKvQueries = new ArrayList<>(); for (String key : keys) { - deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); + deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted, deleteLatest)); } tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { @Override @@ -535,6 +537,55 @@ public class TelemetryController extends BaseController { }); } + @ApiOperation(value = "Delete entity latest time-series data (deleteEntityLatestTimeseries)", + notes = "Delete latest time-series for selected entity based on entity id, entity type and keys. " + + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH, + produces = MediaType.APPLICATION_JSON_VALUE) + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Timeseries for the selected keys in the request was removed. " + + "Platform creates an audit log event about entity latest timeseries removal with action type 'TIMESERIES_DELETED'."), + @ApiResponse(code = 400, message = "Platform returns a bad request in case if keys list is empty."), + @ApiResponse(code = 401, message = "User is not authorized to delete entity latest timeseries for selected entity. Most likely, User belongs to different Customer or Tenant."), + @ApiResponse(code = 500, message = "The exception was thrown during processing the request. " + + "Platform creates an audit log event about entity latest timeseries removal with action type 'TIMESERIES_DELETED' that includes an error stacktrace."), + }) + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/latest/delete", method = RequestMethod.DELETE) + @ResponseBody + public DeferredResult deleteEntityLatestTimeseries(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, defaultValue = "DEVICE") + @PathVariable("entityType") String entityType, + @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true) + @PathVariable("entityId") String entityIdStr, + @ApiParam(value = TELEMETRY_KEYS_DESCRIPTION, required = true) + @RequestParam(name = "keys") String keysStr) throws ThingsboardException { + EntityId entityId = EntityIdFactory.getByTypeAndId(entityType, entityIdStr); + return deleteLatestTimeseries(entityId, keysStr); + } + + private DeferredResult deleteLatestTimeseries(EntityId entityIdStr, String keysStr) throws ThingsboardException { + List keys = toKeysList(keysStr); + if (keys.isEmpty()) { + return getImmediateDeferredResult("Empty keys: " + keysStr, HttpStatus.BAD_REQUEST); + } + SecurityUser user = getCurrentUser(); + + return accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entityIdStr, (result, tenantId, entityId) -> + tsSubService.deleteLatestAndNotify(tenantId, entityId, keys, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) { + logLatestTimeseriesDeleted(user, entityId, keys, null); + result.setResult(new ResponseEntity<>(HttpStatus.OK)); + } + + @Override + public void onFailure(Throwable t) { + logLatestTimeseriesDeleted(user, entityId, keys, t); + result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }) + ); + } + @ApiOperation(value = "Delete device attributes (deleteDeviceAttributes)", notes = "Delete device attributes using provided Device Id, scope and a list of keys. " + "Referencing a non-existing Device Id will cause an error" + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH, @@ -827,6 +878,11 @@ public class TelemetryController extends BaseController { toException(e), keys, startTs, endTs); } + private void logLatestTimeseriesDeleted(SecurityUser user, EntityId entityId, List keys, Throwable e) { + notificationEntityService.logEntityAction(user.getTenantId(), entityId, ActionType.TIMESERIES_DELETED, user, + toException(e), keys); + } + private void logTelemetryUpdated(SecurityUser user, EntityId entityId, List telemetry, Throwable e) { notificationEntityService.logEntityAction(user.getTenantId(), entityId, ActionType.TIMESERIES_UPDATED, user, toException(e), telemetry); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 3f5e52796a..40f4e3415b 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -316,6 +316,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); } + @Override + public void deleteLatestAndNotify(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); + addVoidCallback(deleteFuture, callback); + addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); + } + @Override public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback) { saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 7f57fbbb3f..b448e4fdb8 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -792,6 +792,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return readResponse(doDelete(urlTemplate, params).andExpect(status().isOk()), responseClass); } + protected T doDeleteAsync(String urlTemplate, Class responseClass, String... params) throws Exception { + return readResponse(doDeleteAsync(urlTemplate, DEFAULT_TIMEOUT, params).andExpect(status().isOk()), responseClass); + } + protected ResultActions doPost(String urlTemplate, String... params) throws Exception { MockHttpServletRequestBuilder postRequest = post(urlTemplate); setJwtToken(postRequest); @@ -824,6 +828,15 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return mockMvc.perform(deleteRequest); } + protected ResultActions doDeleteAsync(String urlTemplate, Long timeout, String... params) throws Exception { + MockHttpServletRequestBuilder deleteRequest = delete(urlTemplate, params); + setJwtToken(deleteRequest); +// populateParams(deleteRequest, params); + MvcResult result = mockMvc.perform(deleteRequest).andReturn(); + result.getAsyncResult(timeout); + return mockMvc.perform(asyncDispatch(result)); + } + protected void populateParams(MockHttpServletRequestBuilder request, String... params) { if (params != null && params.length > 0) { Assert.assertEquals(0, params.length % 2); diff --git a/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java index 47cac1b549..2fdab098e4 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TelemetryControllerTest.java @@ -15,14 +15,21 @@ */ package org.thingsboard.server.controller; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.Assert; import org.junit.Test; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.service.DaoSqlTest; +import java.util.List; + import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.query.EntityKeyType.TIME_SERIES; @DaoSqlTest public class TelemetryControllerTest extends AbstractControllerTest { @@ -39,6 +46,136 @@ public class TelemetryControllerTest extends AbstractControllerTest { doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", invalidRequestBody, String.class, status().isBadRequest()); } + @Test + public void testDeleteLatest() throws Exception { + loginTenantAdmin(); + Device device = createDevice(); + + SingleEntityFilter filter = new SingleEntityFilter(); + filter.setSingleEntity(device.getId()); + + getWsClient().subscribeLatestUpdate(List.of(new EntityKey(TIME_SERIES, "data")), filter); + + getWsClient().registerWaitForUpdate(1); + + long startTs = System.currentTimeMillis(); + + String testBody = "{\"data\": \"value\"}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", testBody, String.class, status().isOk()); + + long endTs = System.currentTimeMillis(); + + ObjectNode latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class); + + Assert.assertNotNull(latest); + var data = latest.get("data"); + Assert.assertNotNull(data); + + Assert.assertEquals("value", data.get(0).get("value").asText()); + + ObjectNode timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs); + + Assert.assertNotNull(timeseries); + + Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText()); + + doDeleteAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/latest/delete?keys=data", String.class); + + latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class); + + Assert.assertTrue(latest.get("data").get(0).get("value").isNull()); + + timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs); + + Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText()); + } + + @Test + public void testDeleteAllTelemetryWithLatest() throws Exception { + loginTenantAdmin(); + Device device = createDevice(); + + SingleEntityFilter filter = new SingleEntityFilter(); + filter.setSingleEntity(device.getId()); + + getWsClient().subscribeLatestUpdate(List.of(new EntityKey(TIME_SERIES, "data")), filter); + + getWsClient().registerWaitForUpdate(1); + + long startTs = System.currentTimeMillis(); + + String testBody = "{\"data\": \"value\"}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", testBody, String.class, status().isOk()); + + long endTs = System.currentTimeMillis(); + + ObjectNode latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class); + + Assert.assertNotNull(latest); + var data = latest.get("data"); + Assert.assertNotNull(data); + + Assert.assertEquals("value", data.get(0).get("value").asText()); + + ObjectNode timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs); + + Assert.assertNotNull(timeseries); + + Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText()); + + doDeleteAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/delete?keys=data&deleteAllDataForKeys=true", String.class); + + latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class); + + Assert.assertTrue(latest.get("data").get(0).get("value").isNull()); + + timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs); + + Assert.assertTrue(timeseries.isEmpty()); + } + + @Test + public void testDeleteAllTelemetryWithoutLatest() throws Exception { + loginTenantAdmin(); + Device device = createDevice(); + + SingleEntityFilter filter = new SingleEntityFilter(); + filter.setSingleEntity(device.getId()); + + getWsClient().subscribeLatestUpdate(List.of(new EntityKey(TIME_SERIES, "data")), filter); + + getWsClient().registerWaitForUpdate(1); + + long startTs = System.currentTimeMillis(); + + String testBody = "{\"data\": \"value\"}"; + doPostAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/smth", testBody, String.class, status().isOk()); + + long endTs = System.currentTimeMillis(); + + ObjectNode latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class); + + Assert.assertNotNull(latest); + + Assert.assertEquals("value", latest.get("data").get(0).get("value").asText()); + + ObjectNode timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs); + + Assert.assertNotNull(timeseries); + + Assert.assertEquals("value", timeseries.get("data").get(0).get("value").asText()); + + doDeleteAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/timeseries/delete?keys=data&deleteAllDataForKeys=true&deleteLatest=false", String.class); + + latest = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data", ObjectNode.class); + + Assert.assertEquals("value", latest.get("data").get(0).get("value").asText()); + + timeseries = doGetAsync("/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/timeseries?keys=data&startTs={startTs}&endTs={endTs}", ObjectNode.class, startTs, endTs); + + Assert.assertTrue(timeseries.isEmpty()); + } + private Device createDevice() throws Exception { String testToken = "TEST_TOKEN"; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java index dee0e7aa9b..a97cc8f834 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java @@ -21,14 +21,20 @@ import lombok.Data; public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuery { private final Boolean rewriteLatestIfDeleted; + private final Boolean deleteLatest; - public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) { + public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted, boolean deleteLatest) { super(key, startTs, endTs); this.rewriteLatestIfDeleted = rewriteLatestIfDeleted; + this.deleteLatest = deleteLatest; + } + + public BaseDeleteTsKvQuery(String key, long startTs, long endTs, boolean rewriteLatestIfDeleted) { + this(key, startTs, endTs, rewriteLatestIfDeleted, true); } public BaseDeleteTsKvQuery(String key, long startTs, long endTs) { - this(key, startTs, endTs, false); + this(key, startTs, endTs, false, true); } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java index 7b9b4ad16f..b2f41fffb4 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/DeleteTsKvQuery.java @@ -19,4 +19,6 @@ public interface DeleteTsKvQuery extends TsKvQuery { Boolean getRewriteLatestIfDeleted(); + Boolean getDeleteLatest(); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 34bcd15c0a..6b8bfa9d64 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -275,7 +275,9 @@ public class BaseTimeseriesService implements TimeseriesService { private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); - futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); + if (query.getDeleteLatest()) { + futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); + } } private static void validate(EntityId entityId) { diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 51eb446d70..8c8829a727 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -2364,7 +2364,8 @@ public class RestClient implements Closeable { boolean deleteAllDataForKeys, Long startTs, Long endTs, - boolean rewriteLatestIfDeleted) { + boolean rewriteLatestIfDeleted, + boolean deleteLatest) { Map params = new HashMap<>(); params.put("entityType", entityId.getEntityType().name()); params.put("entityId", entityId.getId().toString()); @@ -2373,17 +2374,34 @@ public class RestClient implements Closeable { params.put("startTs", startTs.toString()); params.put("endTs", endTs.toString()); params.put("rewriteLatestIfDeleted", String.valueOf(rewriteLatestIfDeleted)); + params.put("deleteLatest", String.valueOf(deleteLatest)); return restTemplate .exchange( - baseURL + "/api/plugins/telemetry/{entityType}/{entityId}/timeseries/delete?keys={keys}&deleteAllDataForKeys={deleteAllDataForKeys}&startTs={startTs}&endTs={endTs}&rewriteLatestIfDeleted={rewriteLatestIfDeleted}", + baseURL + "/api/plugins/telemetry/{entityType}/{entityId}/timeseries/delete?keys={keys}&deleteAllDataForKeys={deleteAllDataForKeys}&startTs={startTs}&endTs={endTs}&rewriteLatestIfDeleted={rewriteLatestIfDeleted}&deleteLatest={deleteLatest}", HttpMethod.DELETE, HttpEntity.EMPTY, Object.class, params) .getStatusCode() .is2xxSuccessful(); + } + public boolean deleteEntityLatestTimeseries(EntityId entityId, List keys) { + Map params = new HashMap<>(); + params.put("entityType", entityId.getEntityType().name()); + params.put("entityId", entityId.getId().toString()); + params.put("keys", listToString(keys)); + + return restTemplate + .exchange( + baseURL + "/api/plugins/telemetry/{entityType}/{entityId}/timeseries/latest/delete?keys={keys}", + HttpMethod.DELETE, + HttpEntity.EMPTY, + Object.class, + params) + .getStatusCode() + .is2xxSuccessful(); } public boolean deleteEntityAttributes(DeviceId deviceId, String scope, List keys) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index a61e83f48f..9acd03f665 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -71,4 +71,6 @@ public interface RuleEngineTelemetryService { void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); + + void deleteLatestAndNotify(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); }