improvements (not notify if lates ts has not been deleted)
This commit is contained in:
parent
b8d49b66a6
commit
6d61b249e5
@ -573,7 +573,7 @@ public class TelemetryController extends BaseController {
|
|||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
|
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
|
||||||
}
|
}
|
||||||
tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, rewriteLatestIfDeleted, new FutureCallback<>() {
|
tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Void tmp) {
|
public void onSuccess(@Nullable Void tmp) {
|
||||||
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
|
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
|
||||||
|
|||||||
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry;
|
|||||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
@ -61,10 +62,8 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ashvayka on 27.03.18.
|
* Created by ashvayka on 27.03.18.
|
||||||
@ -255,7 +254,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
|
public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
|
||||||
ListenableFuture<List<TsKvEntry>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
|
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
|
||||||
addVoidCallback(deleteFuture, callback);
|
addVoidCallback(deleteFuture, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -276,10 +275,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback<Void> callback) {
|
public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback) {
|
||||||
ListenableFuture<List<TsKvEntry>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
|
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
|
||||||
addVoidCallback(deleteFuture, callback);
|
addVoidCallback(deleteFuture, callback);
|
||||||
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list, rewriteLatestIfDeleted));
|
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -348,20 +347,24 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, List<TsKvEntry> ts, boolean rewriteLatestIfDeleted) {
|
private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, List<TsKvLatestRemovingResult> ts) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
||||||
if (currentPartitions.contains(tpi)) {
|
if (currentPartitions.contains(tpi)) {
|
||||||
if (subscriptionManagerService.isPresent()) {
|
if (subscriptionManagerService.isPresent()) {
|
||||||
Set<String> updated;
|
List<TsKvEntry> updated = new ArrayList<>();
|
||||||
if (rewriteLatestIfDeleted) {
|
List<String> deleted = new ArrayList<>();
|
||||||
List<TsKvEntry> filteredTs = ts.stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
||||||
subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbCallback.EMPTY);
|
|
||||||
updated = filteredTs.stream().map(TsKvEntry::getKey).collect(Collectors.toSet());
|
|
||||||
} else {
|
|
||||||
updated = Collections.emptySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<String> deleted = keys.stream().filter(key -> updated.isEmpty() || !updated.remove(key)).collect(Collectors.toList());
|
ts.stream().filter(Objects::nonNull).forEach(res -> {
|
||||||
|
if (res.isRemoved()) {
|
||||||
|
if (res.getData() != null) {
|
||||||
|
updated.add(res.getData());
|
||||||
|
} else {
|
||||||
|
deleted.add(res.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, updated, TbCallback.EMPTY);
|
||||||
subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY);
|
subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY);
|
||||||
} else {
|
} else {
|
||||||
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.EntityId;
|
|||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -43,9 +44,9 @@ public interface TimeseriesService {
|
|||||||
|
|
||||||
ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
|
ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
|
||||||
|
|
||||||
ListenableFuture<List<TsKvEntry>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
|
ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
|
||||||
|
|
||||||
ListenableFuture<List<TsKvEntry>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
|
ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
|
||||||
|
|
||||||
ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId);
|
ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId);
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,36 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2021 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.common.data.kv;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class TsKvLatestRemovingResult {
|
||||||
|
private String key;
|
||||||
|
private TsKvEntry data;
|
||||||
|
private boolean removed;
|
||||||
|
|
||||||
|
public TsKvLatestRemovingResult(TsKvEntry data) {
|
||||||
|
this.key = data.getKey();
|
||||||
|
this.data = data;
|
||||||
|
this.removed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TsKvLatestRemovingResult(String key, boolean removed) {
|
||||||
|
this.key = key;
|
||||||
|
this.removed = removed;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -16,7 +16,6 @@
|
|||||||
package org.thingsboard.server.dao.sqlts;
|
package org.thingsboard.server.dao.sqlts;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
@ -34,6 +33,7 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
|||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.stats.StatsFactory;
|
import org.thingsboard.server.common.stats.StatsFactory;
|
||||||
import org.thingsboard.server.dao.DaoUtil;
|
import org.thingsboard.server.dao.DaoUtil;
|
||||||
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
|
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
|
||||||
@ -45,11 +45,9 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
|
|||||||
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
|
||||||
import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
|
import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
|
||||||
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
|
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
|
||||||
import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
|
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
|
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
|
||||||
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
|
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -59,7 +57,6 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ -147,7 +144,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<TsKvEntry> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
return getRemoveLatestFuture(tenantId, entityId, query);
|
return getRemoveLatestFuture(tenantId, entityId, query);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,16 +172,16 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
|
return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<TsKvEntry> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
|
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
|
||||||
return Futures.transformAsync(future, entryList -> {
|
return Futures.transformAsync(future, entryList -> {
|
||||||
if (entryList.size() == 1) {
|
if (entryList.size() == 1) {
|
||||||
TsKvEntry entry = entryList.get(0);
|
TsKvEntry entry = entryList.get(0);
|
||||||
return Futures.transform(getSaveLatestFuture(entityId, entry), v -> entry, MoreExecutors.directExecutor());
|
return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor());
|
||||||
} else {
|
} else {
|
||||||
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true));
|
||||||
}, service);
|
}, service);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,7 +210,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
return Futures.immediateFuture(result);
|
return Futures.immediateFuture(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ListenableFuture<TsKvEntry> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
ListenableFuture<TsKvEntry> latestFuture = getFindLatestFuture(entityId, query.getKey());
|
ListenableFuture<TsKvEntry> latestFuture = getFindLatestFuture(entityId, query.getKey());
|
||||||
|
|
||||||
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
|
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
|
||||||
@ -221,47 +218,25 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
|
|||||||
return ts > query.getStartTs() && ts <= query.getEndTs();
|
return ts > query.getStartTs() && ts <= query.getEndTs();
|
||||||
}, service);
|
}, service);
|
||||||
|
|
||||||
ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
|
ListenableFuture<Boolean> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
|
||||||
if (isRemove) {
|
if (isRemove) {
|
||||||
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
|
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
|
||||||
latestEntity.setEntityId(entityId.getId());
|
latestEntity.setEntityId(entityId.getId());
|
||||||
latestEntity.setKey(getOrSaveKeyId(query.getKey()));
|
latestEntity.setKey(getOrSaveKeyId(query.getKey()));
|
||||||
return service.submit(() -> {
|
return service.submit(() -> {
|
||||||
tsKvLatestRepository.delete(latestEntity);
|
tsKvLatestRepository.delete(latestEntity);
|
||||||
return null;
|
return true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(false);
|
||||||
}, service);
|
}, service);
|
||||||
|
|
||||||
final SimpleListenableFuture<TsKvEntry> resultFuture = new SimpleListenableFuture<>();
|
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
|
||||||
Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
|
if (isRemoved && query.getRewriteLatestIfDeleted()) {
|
||||||
@Override
|
return getNewLatestEntryFuture(tenantId, entityId, query);
|
||||||
public void onSuccess(@Nullable Void result) {
|
|
||||||
if (query.getRewriteLatestIfDeleted()) {
|
|
||||||
ListenableFuture<TsKvEntry> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
|
|
||||||
if (isRemove) {
|
|
||||||
return getNewLatestEntryFuture(tenantId, entityId, query);
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}, service);
|
|
||||||
|
|
||||||
try {
|
|
||||||
resultFuture.set(savedLatestFuture.get());
|
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
|
||||||
log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
resultFuture.set(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
log.warn("[{}] Failed to process remove of the latest value", entityId, t);
|
|
||||||
}
|
}
|
||||||
|
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
return resultFuture;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {
|
protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {
|
||||||
|
|||||||
@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
|
|||||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
|
import org.thingsboard.server.dao.exception.IncorrectParameterException;
|
||||||
@ -195,10 +196,10 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<TsKvEntry>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
|
public ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
|
||||||
validate(entityId);
|
validate(entityId);
|
||||||
deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
|
deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
|
||||||
List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
|
List<ListenableFuture<TsKvLatestRemovingResult>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
|
||||||
for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
|
for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
|
||||||
deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery);
|
deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery);
|
||||||
}
|
}
|
||||||
@ -206,9 +207,9 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<TsKvEntry>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
|
public ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
|
||||||
validate(entityId);
|
validate(entityId);
|
||||||
List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
|
List<ListenableFuture<TsKvLatestRemovingResult>> futures = Lists.newArrayListWithExpectedSize(keys.size());
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false);
|
DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false);
|
||||||
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
|
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
|
||||||
@ -229,7 +230,7 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvEntry>> futures, EntityId entityId, DeleteTsKvQuery query) {
|
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
||||||
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
|
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
|
||||||
futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
|
||||||
|
|||||||
@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
|
|||||||
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.dao.model.ModelConstants;
|
import org.thingsboard.server.dao.model.ModelConstants;
|
||||||
import org.thingsboard.server.dao.nosql.TbResultSet;
|
import org.thingsboard.server.dao.nosql.TbResultSet;
|
||||||
@ -114,7 +115,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<TsKvEntry> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(tenantId, entityId, query.getKey());
|
ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(tenantId, entityId, query.getKey());
|
||||||
|
|
||||||
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
|
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
|
||||||
@ -127,44 +128,22 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
|
|||||||
return false;
|
return false;
|
||||||
}, readResultsProcessingExecutor);
|
}, readResultsProcessingExecutor);
|
||||||
|
|
||||||
ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
|
ListenableFuture<Boolean> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
|
||||||
if (isRemove) {
|
if (isRemove) {
|
||||||
return deleteLatest(tenantId, entityId, query.getKey());
|
return Futures.transform(deleteLatest(tenantId, entityId, query.getKey()), res -> true, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(false);
|
||||||
}, readResultsProcessingExecutor);
|
}, readResultsProcessingExecutor);
|
||||||
|
|
||||||
final SimpleListenableFuture<TsKvEntry> resultFuture = new SimpleListenableFuture<>();
|
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
|
||||||
Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
|
if (isRemoved && query.getRewriteLatestIfDeleted()) {
|
||||||
@Override
|
return getNewLatestEntryFuture(tenantId, entityId, query);
|
||||||
public void onSuccess(@Nullable Void result) {
|
|
||||||
if (query.getRewriteLatestIfDeleted()) {
|
|
||||||
ListenableFuture<TsKvEntry> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
|
|
||||||
if (isRemove) {
|
|
||||||
return getNewLatestEntryFuture(tenantId, entityId, query);
|
|
||||||
}
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}, readResultsProcessingExecutor);
|
|
||||||
|
|
||||||
try {
|
|
||||||
resultFuture.set(savedLatestFuture.get());
|
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
|
||||||
log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
resultFuture.set(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Throwable t) {
|
|
||||||
log.warn("[{}] Failed to process remove of the latest value", entityId, t);
|
|
||||||
}
|
}
|
||||||
|
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
return resultFuture;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<TsKvEntry> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
|
||||||
long startTs = 0;
|
long startTs = 0;
|
||||||
long endTs = query.getStartTs() - 1;
|
long endTs = query.getStartTs() - 1;
|
||||||
ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
|
ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
|
||||||
@ -174,11 +153,11 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
|
|||||||
return Futures.transformAsync(future, entryList -> {
|
return Futures.transformAsync(future, entryList -> {
|
||||||
if (entryList.size() == 1) {
|
if (entryList.size() == 1) {
|
||||||
TsKvEntry entry = entryList.get(0);
|
TsKvEntry entry = entryList.get(0);
|
||||||
return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> entry, MoreExecutors.directExecutor());
|
return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor());
|
||||||
} else {
|
} else {
|
||||||
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true));
|
||||||
}, readResultsProcessingExecutor);
|
}, readResultsProcessingExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
|
|||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||||
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -32,7 +33,7 @@ public interface TimeseriesLatestDao {
|
|||||||
|
|
||||||
ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
|
ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
|
||||||
|
|
||||||
ListenableFuture<TsKvEntry> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
|
ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
|
||||||
|
|
||||||
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
|
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
|
||||||
|
|
||||||
|
|||||||
@ -55,5 +55,5 @@ public interface RuleEngineTelemetryService {
|
|||||||
|
|
||||||
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
|
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
|
||||||
|
|
||||||
void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback<Void> callback);
|
void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -90,7 +90,7 @@ export class AttributeDatasource implements DataSource<AttributeData> {
|
|||||||
return this.getAllAttributes(entityId, attributesScope).pipe(
|
return this.getAllAttributes(entityId, attributesScope).pipe(
|
||||||
map((data) => {
|
map((data) => {
|
||||||
let filteredData = [];
|
let filteredData = [];
|
||||||
for(let key in filteredData) {
|
for(let key in data) {
|
||||||
if(data[key]['value'] !== null) {
|
if(data[key]['value'] !== null) {
|
||||||
filteredData.push(data[key]);
|
filteredData.push(data[key]);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user