send notifications by rewriteLatestIfDeleted
This commit is contained in:
		
							parent
							
								
									b79c2407c4
								
							
						
					
					
						commit
						b8d49b66a6
					
				@ -573,7 +573,7 @@ public class TelemetryController extends BaseController {
 | 
			
		||||
            for (String key : keys) {
 | 
			
		||||
                deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
 | 
			
		||||
            }
 | 
			
		||||
            tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() {
 | 
			
		||||
            tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, rewriteLatestIfDeleted, new FutureCallback<>() {
 | 
			
		||||
                @Override
 | 
			
		||||
                public void onSuccess(@Nullable Void tmp) {
 | 
			
		||||
                    logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
 | 
			
		||||
 | 
			
		||||
@ -90,7 +90,7 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList
 | 
			
		||||
        Futures.addCallback(saveFuture, new FutureCallback<T>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable T result) {
 | 
			
		||||
                callback.accept(null);
 | 
			
		||||
                callback.accept(result);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityView;
 | 
			
		||||
@ -46,7 +47,6 @@ import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.cluster.TbClusterService;
 | 
			
		||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
 | 
			
		||||
 | 
			
		||||
import javax.annotation.Nullable;
 | 
			
		||||
@ -59,9 +59,12 @@ import java.util.Comparator;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.concurrent.ExecutorService;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 27.03.18.
 | 
			
		||||
@ -252,7 +255,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
 | 
			
		||||
        ListenableFuture<List<Void>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
 | 
			
		||||
        addVoidCallback(deleteFuture, callback);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -273,10 +276,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback) {
 | 
			
		||||
        ListenableFuture<List<Void>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
 | 
			
		||||
    public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback<Void> callback) {
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
 | 
			
		||||
        addVoidCallback(deleteFuture, callback);
 | 
			
		||||
        addWsCallback(deleteFuture, success -> onTimeSeriesDelete(tenantId, entityId, keys));
 | 
			
		||||
        addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list, rewriteLatestIfDeleted));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -345,11 +348,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys) {
 | 
			
		||||
    private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, List<TsKvEntry> ts, boolean rewriteLatestIfDeleted) {
 | 
			
		||||
        TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
 | 
			
		||||
        if (currentPartitions.contains(tpi)) {
 | 
			
		||||
            if (subscriptionManagerService.isPresent()) {
 | 
			
		||||
                subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, keys, TbCallback.EMPTY);
 | 
			
		||||
                Set<String> updated;
 | 
			
		||||
                if (rewriteLatestIfDeleted) {
 | 
			
		||||
                    List<TsKvEntry> filteredTs = ts.stream().filter(Objects::nonNull).collect(Collectors.toList());
 | 
			
		||||
                    subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbCallback.EMPTY);
 | 
			
		||||
                    updated = filteredTs.stream().map(TsKvEntry::getKey).collect(Collectors.toSet());
 | 
			
		||||
                } else {
 | 
			
		||||
                    updated = Collections.emptySet();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                List<String> deleted = keys.stream().filter(key -> updated.isEmpty() || !updated.remove(key)).collect(Collectors.toList());
 | 
			
		||||
                subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.warn("Possible misconfiguration because subscriptionManagerService is null!");
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -43,9 +43,9 @@ public interface TimeseriesService {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
 | 
			
		||||
    ListenableFuture<List<TsKvEntry>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
 | 
			
		||||
    ListenableFuture<List<TsKvEntry>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -147,7 +147,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
    public ListenableFuture<TsKvEntry> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        return getRemoveLatestFuture(tenantId, entityId, query);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -175,11 +175,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
 | 
			
		||||
        return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
    private ListenableFuture<TsKvEntry> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
 | 
			
		||||
        return Futures.transformAsync(future, entryList -> {
 | 
			
		||||
            if (entryList.size() == 1) {
 | 
			
		||||
                return getSaveLatestFuture(entityId, entryList.get(0));
 | 
			
		||||
                TsKvEntry entry = entryList.get(0);
 | 
			
		||||
                return Futures.transform(getSaveLatestFuture(entityId, entry), v -> entry, MoreExecutors.directExecutor());
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
 | 
			
		||||
            }
 | 
			
		||||
@ -212,7 +213,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
 | 
			
		||||
        return Futures.immediateFuture(result);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected ListenableFuture<Void> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
    protected ListenableFuture<TsKvEntry> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        ListenableFuture<TsKvEntry> latestFuture = getFindLatestFuture(entityId, query.getKey());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
 | 
			
		||||
@ -233,12 +234,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
 | 
			
		||||
            return Futures.immediateFuture(null);
 | 
			
		||||
        }, service);
 | 
			
		||||
 | 
			
		||||
        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
 | 
			
		||||
        final SimpleListenableFuture<TsKvEntry> resultFuture = new SimpleListenableFuture<>();
 | 
			
		||||
        Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable Void result) {
 | 
			
		||||
                if (query.getRewriteLatestIfDeleted()) {
 | 
			
		||||
                    ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                    ListenableFuture<TsKvEntry> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                        if (isRemove) {
 | 
			
		||||
                            return getNewLatestEntryFuture(tenantId, entityId, query);
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
@ -195,10 +195,10 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
 | 
			
		||||
    public ListenableFuture<List<TsKvEntry>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
 | 
			
		||||
        validate(entityId);
 | 
			
		||||
        deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
 | 
			
		||||
        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
 | 
			
		||||
        List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
 | 
			
		||||
        for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
 | 
			
		||||
            deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery);
 | 
			
		||||
        }
 | 
			
		||||
@ -206,9 +206,9 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
 | 
			
		||||
    public ListenableFuture<List<TsKvEntry>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
 | 
			
		||||
        validate(entityId);
 | 
			
		||||
        List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(keys.size());
 | 
			
		||||
        List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
 | 
			
		||||
        for (String key : keys) {
 | 
			
		||||
            DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false);
 | 
			
		||||
            futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
 | 
			
		||||
@ -229,10 +229,10 @@ public class BaseTimeseriesService implements TimeseriesService {
 | 
			
		||||
        }, MoreExecutors.directExecutor());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        futures.add(timeseriesDao.remove(tenantId, entityId, query));
 | 
			
		||||
    private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvEntry>> futures, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
 | 
			
		||||
        futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
 | 
			
		||||
        futures.add(timeseriesDao.removePartition(tenantId, entityId, query));
 | 
			
		||||
        futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static void validate(EntityId entityId) {
 | 
			
		||||
 | 
			
		||||
@ -114,7 +114,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
    public ListenableFuture<TsKvEntry> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(tenantId, entityId, query.getKey());
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
 | 
			
		||||
@ -134,12 +134,12 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
 | 
			
		||||
            return Futures.immediateFuture(null);
 | 
			
		||||
        }, readResultsProcessingExecutor);
 | 
			
		||||
 | 
			
		||||
        final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
 | 
			
		||||
        final SimpleListenableFuture<TsKvEntry> resultFuture = new SimpleListenableFuture<>();
 | 
			
		||||
        Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onSuccess(@Nullable Void result) {
 | 
			
		||||
                if (query.getRewriteLatestIfDeleted()) {
 | 
			
		||||
                    ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                    ListenableFuture<TsKvEntry> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
 | 
			
		||||
                        if (isRemove) {
 | 
			
		||||
                            return getNewLatestEntryFuture(tenantId, entityId, query);
 | 
			
		||||
                        }
 | 
			
		||||
@ -164,7 +164,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
 | 
			
		||||
        return resultFuture;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
    private ListenableFuture<TsKvEntry> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
 | 
			
		||||
        long startTs = 0;
 | 
			
		||||
        long endTs = query.getStartTs() - 1;
 | 
			
		||||
        ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
 | 
			
		||||
@ -173,7 +173,8 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
 | 
			
		||||
 | 
			
		||||
        return Futures.transformAsync(future, entryList -> {
 | 
			
		||||
            if (entryList.size() == 1) {
 | 
			
		||||
                return saveLatest(tenantId, entityId, entryList.get(0));
 | 
			
		||||
                TsKvEntry entry = entryList.get(0);
 | 
			
		||||
                return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> entry, MoreExecutors.directExecutor());
 | 
			
		||||
            } else {
 | 
			
		||||
                log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -32,7 +32,7 @@ public interface TimeseriesLatestDao {
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
 | 
			
		||||
    ListenableFuture<TsKvEntry> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
 | 
			
		||||
 | 
			
		||||
    List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -55,5 +55,5 @@ public interface RuleEngineTelemetryService {
 | 
			
		||||
 | 
			
		||||
    void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
 | 
			
		||||
 | 
			
		||||
    void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback);
 | 
			
		||||
    void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback<Void> callback);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -88,7 +88,15 @@ export class AttributeDatasource implements DataSource<AttributeData> {
 | 
			
		||||
  fetchAttributes(entityId: EntityId, attributesScope: TelemetryType,
 | 
			
		||||
                  pageLink: PageLink): Observable<PageData<AttributeData>> {
 | 
			
		||||
    return this.getAllAttributes(entityId, attributesScope).pipe(
 | 
			
		||||
      map((data) => pageLink.filterData(data))
 | 
			
		||||
      map((data) => {
 | 
			
		||||
        let filteredData = [];
 | 
			
		||||
        for(let key in filteredData) {
 | 
			
		||||
          if(data[key]['value'] !== null) {
 | 
			
		||||
            filteredData.push(data[key]);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        return pageLink.filterData(filteredData);
 | 
			
		||||
      })
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user