diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index a518eaa1af..fa69c99245 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -49,6 +49,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.AttributeScope; @@ -521,19 +522,25 @@ public class TelemetryController extends BaseController { for (String key : keys) { deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted, deleteLatest)); } - tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) { - logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); - result.setResult(new ResponseEntity<>(HttpStatus.OK)); - } + tsSubService.deleteTimeseries(TimeseriesDeleteRequest.builder() + .tenantId(tenantId) + .entityId(entityId) + .keys(keys) + .deleteHistoryQueries(deleteTsKvQueries) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List tmp) { + logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); + result.setResult(new ResponseEntity<>(HttpStatus.OK)); + } - @Override - public void onFailure(Throwable t) { - logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t); - result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); - } - }); + @Override + public void onFailure(Throwable t) { + logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t); + result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }) + .build()); }); } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java index 7dffb43cea..686e85cb1e 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.Customer; @@ -58,6 +59,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -403,51 +405,32 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen private ListenableFuture deleteLatestFromEntityView(EntityView entityView, List keys, User user) { EntityViewId entityId = entityView.getId(); SettableFuture resultFuture = SettableFuture.create(); - if (keys != null && !keys.isEmpty()) { - tsSubService.deleteLatest(entityView.getTenantId(), entityId, keys, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void tmp) { - try { - logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, null); - } catch (ThingsboardException e) { - log.error("Failed to log timeseries delete", e); + tsSubService.deleteTimeseries(TimeseriesDeleteRequest.builder() + .tenantId(entityView.getTenantId()) + .entityId(entityId) + .keys(keys) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + try { + logTimeseriesDeleted(entityView.getTenantId(), user, entityId, result, null); + } catch (ThingsboardException e) { + log.error("Failed to log timeseries delete", e); + } + resultFuture.set(null); } - resultFuture.set(tmp); - } - @Override - public void onFailure(Throwable t) { - try { - logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, t); - } catch (ThingsboardException e) { - log.error("Failed to log timeseries delete", e); + @Override + public void onFailure(Throwable t) { + try { + logTimeseriesDeleted(entityView.getTenantId(), user, entityId, Optional.ofNullable(keys).orElse(Collections.emptyList()), t); + } catch (ThingsboardException e) { + log.error("Failed to log timeseries delete", e); + } + resultFuture.setException(t); } - resultFuture.setException(t); - } - }); - } else { - tsSubService.deleteAllLatest(entityView.getTenantId(), entityId, new FutureCallback>() { - @Override - public void onSuccess(@Nullable Collection keys) { - try { - logTimeseriesDeleted(entityView.getTenantId(), user, entityId, new ArrayList<>(keys), null); - } catch (ThingsboardException e) { - log.error("Failed to log timeseries delete", e); - } - resultFuture.set(null); - } - - @Override - public void onFailure(Throwable t) { - try { - logTimeseriesDeleted(entityView.getTenantId(), user, entityId, Collections.emptyList(), t); - } catch (ThingsboardException e) { - log.error("Failed to log timeseries delete", e); - } - resultFuture.setException(t); - } - }); - } + }) + .build()); return resultFuture; } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index ad7f963894..d4444d5d7f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -23,13 +23,16 @@ import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; @@ -38,7 +41,6 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -51,7 +53,6 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -60,6 +61,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; /** * Created by ashvayka on 27.03.18. @@ -177,38 +179,26 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { - checkInternalEntity(entityId); - deleteLatestInternal(tenantId, entityId, keys, callback); + public void deleteTimeseries(TimeseriesDeleteRequest request) { + checkInternalEntity(request.getEntityId()); + deleteTimeseriesInternal(request); } @Override - public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); - addMainCallback(deleteFuture, callback); - } - - @Override - public void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback) { - ListenableFuture> deleteFuture = tsService.removeAllLatest(tenantId, entityId); - Futures.addCallback(deleteFuture, new FutureCallback>() { - @Override - public void onSuccess(@Nullable Collection result) { - callback.onSuccess(result); + public void deleteTimeseriesInternal(TimeseriesDeleteRequest request) { + if (CollectionUtils.isNotEmpty(request.getKeys())) { + ListenableFuture> deleteFuture; + if (request.getDeleteHistoryQueries() == null) { + deleteFuture = tsService.removeLatest(request.getTenantId(), request.getEntityId(), request.getKeys()); + } else { + deleteFuture = tsService.remove(request.getTenantId(), request.getEntityId(), request.getDeleteHistoryQueries()); + addWsCallback(deleteFuture, result -> onTimeSeriesDelete(request.getTenantId(), request.getEntityId(), request.getKeys(), result)); } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }, tsCallBackExecutor); - } - - @Override - public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); - addMainCallback(deleteFuture, callback); - addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); + addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure); + } else { + ListenableFuture> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId()); + addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure); + } } private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List ts) { @@ -314,17 +304,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { if (callback == null) return; - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable S result) { - callback.onSuccess(null); - } + addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure); + } - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }, tsCallBackExecutor); + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess, Consumer onFailure) { + DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor); } private void checkInternalEntity(EntityId entityId) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 1db6a86507..8e45b84a75 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -15,16 +15,12 @@ */ package org.thingsboard.server.service.telemetry; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; - -import java.util.List; /** * Created by ashvayka on 27.03.18. @@ -35,8 +31,8 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { void saveAttributesInternal(AttributesSaveRequest request); + void deleteTimeseriesInternal(TimeseriesDeleteRequest request); + void deleteAttributesInternal(AttributesDeleteRequest request); - void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); - } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index eaba144042..49918f3823 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -56,7 +56,7 @@ public interface TimeseriesService { ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); - ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId); + ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index f9c4218d64..756b73d88b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -254,11 +254,11 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId) { + public ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId) { validate(entityId); return Futures.transformAsync(this.findAllLatest(tenantId, entityId), latest -> { if (latest != null && !latest.isEmpty()) { - Collection keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()); + List keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()); return Futures.transform(this.removeLatest(tenantId, entityId, keys), res -> keys, MoreExecutors.directExecutor()); } else { return Futures.immediateFuture(Collections.emptyList()); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 17ed6aaa13..17696b4bb1 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -15,14 +15,6 @@ */ package org.thingsboard.rule.engine.api; -import com.google.common.util.concurrent.FutureCallback; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; - -import java.util.Collection; -import java.util.List; - /** * Created by ashvayka on 02.04.18. */ @@ -32,12 +24,8 @@ public interface RuleEngineTelemetryService { void saveAttributes(AttributesSaveRequest request); + void deleteTimeseries(TimeseriesDeleteRequest request); + void deleteAttributes(AttributesDeleteRequest request); - void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); - - void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); - - void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); - } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java new file mode 100644 index 0000000000..b124806fff --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java @@ -0,0 +1,85 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.FutureCallback; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; + +import java.util.List; + +@Getter +@ToString +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class TimeseriesDeleteRequest { + + private final TenantId tenantId; + private final EntityId entityId; + private final List keys; + private final List deleteHistoryQueries; + private final FutureCallback> callback; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private TenantId tenantId; + private EntityId entityId; + private List keys; + private List deleteHistoryQueries; + private FutureCallback> callback; + + Builder() {} + + public Builder tenantId(TenantId tenantId) { + this.tenantId = tenantId; + return this; + } + + public Builder entityId(EntityId entityId) { + this.entityId = entityId; + return this; + } + + public Builder keys(List keys) { + this.keys = keys; + return this; + } + + public Builder deleteHistoryQueries(List deleteHistoryQueries) { + this.deleteHistoryQueries = deleteHistoryQueries; + return this; + } + + public Builder callback(FutureCallback> callback) { + this.callback = callback; + return this; + } + + public TimeseriesDeleteRequest build() { + return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, callback); + } + + } + +}