Refactor delete for latest and history

This commit is contained in:
ViacheslavKlimov 2024-12-18 17:34:35 +02:00
parent eb7bc8695b
commit 823d89dca6
8 changed files with 161 additions and 118 deletions

View File

@ -49,6 +49,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
@ -521,9 +522,14 @@ public class TelemetryController extends BaseController {
for (String key : keys) { for (String key : keys) {
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted, deleteLatest)); deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted, deleteLatest));
} }
tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { tsSubService.deleteTimeseries(TimeseriesDeleteRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.keys(keys)
.deleteHistoryQueries(deleteTsKvQueries)
.callback(new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable Void tmp) { public void onSuccess(@Nullable List<String> tmp) {
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
result.setResult(new ResponseEntity<>(HttpStatus.OK)); result.setResult(new ResponseEntity<>(HttpStatus.OK));
} }
@ -533,7 +539,8 @@ public class TelemetryController extends BaseController {
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t); logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
} }
}); })
.build());
}); });
} }

View File

@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.ConcurrentReferenceHashMap; import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Customer;
@ -58,6 +59,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -403,34 +405,15 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
private ListenableFuture<Void> deleteLatestFromEntityView(EntityView entityView, List<String> keys, User user) { private ListenableFuture<Void> deleteLatestFromEntityView(EntityView entityView, List<String> keys, User user) {
EntityViewId entityId = entityView.getId(); EntityViewId entityId = entityView.getId();
SettableFuture<Void> resultFuture = SettableFuture.create(); SettableFuture<Void> resultFuture = SettableFuture.create();
if (keys != null && !keys.isEmpty()) { tsSubService.deleteTimeseries(TimeseriesDeleteRequest.builder()
tsSubService.deleteLatest(entityView.getTenantId(), entityId, keys, new FutureCallback<Void>() { .tenantId(entityView.getTenantId())
.entityId(entityId)
.keys(keys)
.callback(new FutureCallback<>() {
@Override @Override
public void onSuccess(@Nullable Void tmp) { public void onSuccess(@Nullable List<String> result) {
try { try {
logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, null); logTimeseriesDeleted(entityView.getTenantId(), user, entityId, result, null);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
resultFuture.set(tmp);
}
@Override
public void onFailure(Throwable t) {
try {
logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, t);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
resultFuture.setException(t);
}
});
} else {
tsSubService.deleteAllLatest(entityView.getTenantId(), entityId, new FutureCallback<Collection<String>>() {
@Override
public void onSuccess(@Nullable Collection<String> keys) {
try {
logTimeseriesDeleted(entityView.getTenantId(), user, entityId, new ArrayList<>(keys), null);
} catch (ThingsboardException e) { } catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e); log.error("Failed to log timeseries delete", e);
} }
@ -440,14 +423,14 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
try { try {
logTimeseriesDeleted(entityView.getTenantId(), user, entityId, Collections.emptyList(), t); logTimeseriesDeleted(entityView.getTenantId(), user, entityId, Optional.ofNullable(keys).orElse(Collections.emptyList()), t);
} catch (ThingsboardException e) { } catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e); log.error("Failed to log timeseries delete", e);
} }
resultFuture.setException(t); resultFuture.setException(t);
} }
}); })
} .build());
return resultFuture; return resultFuture;
} }

View File

@ -23,13 +23,16 @@ import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
@ -38,7 +41,6 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
@ -51,7 +53,6 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -60,6 +61,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer;
/** /**
* Created by ashvayka on 27.03.18. * Created by ashvayka on 27.03.18.
@ -177,38 +179,26 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
} }
@Override @Override
public void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) { public void deleteTimeseries(TimeseriesDeleteRequest request) {
checkInternalEntity(entityId); checkInternalEntity(request.getEntityId());
deleteLatestInternal(tenantId, entityId, keys, callback); deleteTimeseriesInternal(request);
} }
@Override @Override
public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) { public void deleteTimeseriesInternal(TimeseriesDeleteRequest request) {
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); if (CollectionUtils.isNotEmpty(request.getKeys())) {
addMainCallback(deleteFuture, callback); ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture;
if (request.getDeleteHistoryQueries() == null) {
deleteFuture = tsService.removeLatest(request.getTenantId(), request.getEntityId(), request.getKeys());
} else {
deleteFuture = tsService.remove(request.getTenantId(), request.getEntityId(), request.getDeleteHistoryQueries());
addWsCallback(deleteFuture, result -> onTimeSeriesDelete(request.getTenantId(), request.getEntityId(), request.getKeys(), result));
} }
addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure);
@Override } else {
public void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback) { ListenableFuture<List<String>> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId());
ListenableFuture<Collection<String>> deleteFuture = tsService.removeAllLatest(tenantId, entityId); addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure);
Futures.addCallback(deleteFuture, new FutureCallback<Collection<String>>() {
@Override
public void onSuccess(@Nullable Collection<String> result) {
callback.onSuccess(result);
} }
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
@Override
public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback) {
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
addMainCallback(deleteFuture, callback);
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
} }
private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) { private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
@ -314,17 +304,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private <S> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) { private <S> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
if (callback == null) return; if (callback == null) return;
Futures.addCallback(saveFuture, new FutureCallback<S>() { addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure);
@Override
public void onSuccess(@Nullable S result) {
callback.onSuccess(null);
} }
@Override private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess, Consumer<Throwable> onFailure) {
public void onFailure(Throwable t) { DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor);
callback.onFailure(t);
}
}, tsCallBackExecutor);
} }
private void checkInternalEntity(EntityId entityId) { private void checkInternalEntity(EntityId entityId) {

View File

@ -15,16 +15,12 @@
*/ */
package org.thingsboard.server.service.telemetry; package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.List;
/** /**
* Created by ashvayka on 27.03.18. * Created by ashvayka on 27.03.18.
@ -35,8 +31,8 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
void saveAttributesInternal(AttributesSaveRequest request); void saveAttributesInternal(AttributesSaveRequest request);
void deleteTimeseriesInternal(TimeseriesDeleteRequest request);
void deleteAttributesInternal(AttributesDeleteRequest request); void deleteAttributesInternal(AttributesDeleteRequest request);
void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
} }

View File

@ -56,7 +56,7 @@ public interface TimeseriesService {
ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys); ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId); ListenableFuture<List<String>> removeAllLatest(TenantId tenantId, EntityId entityId);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);

View File

@ -254,11 +254,11 @@ public class BaseTimeseriesService implements TimeseriesService {
} }
@Override @Override
public ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId) { public ListenableFuture<List<String>> removeAllLatest(TenantId tenantId, EntityId entityId) {
validate(entityId); validate(entityId);
return Futures.transformAsync(this.findAllLatest(tenantId, entityId), latest -> { return Futures.transformAsync(this.findAllLatest(tenantId, entityId), latest -> {
if (latest != null && !latest.isEmpty()) { if (latest != null && !latest.isEmpty()) {
Collection<String> keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()); List<String> keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList());
return Futures.transform(this.removeLatest(tenantId, entityId, keys), res -> keys, MoreExecutors.directExecutor()); return Futures.transform(this.removeLatest(tenantId, entityId, keys), res -> keys, MoreExecutors.directExecutor());
} else { } else {
return Futures.immediateFuture(Collections.emptyList()); return Futures.immediateFuture(Collections.emptyList());

View File

@ -15,14 +15,6 @@
*/ */
package org.thingsboard.rule.engine.api; package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import java.util.Collection;
import java.util.List;
/** /**
* Created by ashvayka on 02.04.18. * Created by ashvayka on 02.04.18.
*/ */
@ -32,12 +24,8 @@ public interface RuleEngineTelemetryService {
void saveAttributes(AttributesSaveRequest request); void saveAttributes(AttributesSaveRequest request);
void deleteTimeseries(TimeseriesDeleteRequest request);
void deleteAttributes(AttributesDeleteRequest request); void deleteAttributes(AttributesDeleteRequest request);
void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback);
} }

View File

@ -0,0 +1,85 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import java.util.List;
@Getter
@ToString
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class TimeseriesDeleteRequest {
private final TenantId tenantId;
private final EntityId entityId;
private final List<String> keys;
private final List<DeleteTsKvQuery> deleteHistoryQueries;
private final FutureCallback<List<String>> callback;
public static Builder builder() {
return new Builder();
}
public static class Builder {
private TenantId tenantId;
private EntityId entityId;
private List<String> keys;
private List<DeleteTsKvQuery> deleteHistoryQueries;
private FutureCallback<List<String>> callback;
Builder() {}
public Builder tenantId(TenantId tenantId) {
this.tenantId = tenantId;
return this;
}
public Builder entityId(EntityId entityId) {
this.entityId = entityId;
return this;
}
public Builder keys(List<String> keys) {
this.keys = keys;
return this;
}
public Builder deleteHistoryQueries(List<DeleteTsKvQuery> deleteHistoryQueries) {
this.deleteHistoryQueries = deleteHistoryQueries;
return this;
}
public Builder callback(FutureCallback<List<String>> callback) {
this.callback = callback;
return this;
}
public TimeseriesDeleteRequest build() {
return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, callback);
}
}
}