diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 70a6b029ed..dfe0701642 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -573,7 +573,7 @@ public class TelemetryController extends BaseController { for (String key : keys) { deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); } - tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { + tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, rewriteLatestIfDeleted, new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java index bbd3811075..3512c37134 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java @@ -90,7 +90,7 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList Futures.addCallback(saveFuture, new FutureCallback() { @Override public void onSuccess(@Nullable T result) { - callback.accept(null); + callback.accept(result); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index dba664013d..8a911962f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; @@ -46,7 +47,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import javax.annotation.Nullable; @@ -59,9 +59,12 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * Created by ashvayka on 27.03.18. @@ -252,7 +255,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); + ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); addVoidCallback(deleteFuture, callback); } @@ -273,10 +276,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); + public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); addVoidCallback(deleteFuture, callback); - addWsCallback(deleteFuture, success -> onTimeSeriesDelete(tenantId, entityId, keys)); + addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list, rewriteLatestIfDeleted)); } @Override @@ -345,11 +348,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys) { + private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, List ts, boolean rewriteLatestIfDeleted) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, keys, TbCallback.EMPTY); + Set updated; + if (rewriteLatestIfDeleted) { + List filteredTs = ts.stream().filter(Objects::nonNull).collect(Collectors.toList()); + subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbCallback.EMPTY); + updated = filteredTs.stream().map(TsKvEntry::getKey).collect(Collectors.toSet()); + } else { + updated = Collections.emptySet(); + } + + List deleted = keys.stream().filter(key -> updated.isEmpty() || !updated.remove(key)).collect(Collectors.toList()); + subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index b1a2541fc7..4da8c14eeb 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -43,9 +43,9 @@ public interface TimeseriesService { ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); - ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); + ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); - ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); + ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 3937b7c2c5..3534a4c253 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -147,7 +147,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { return getRemoveLatestFuture(tenantId, entityId, query); } @@ -175,11 +175,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { - return getSaveLatestFuture(entityId, entryList.get(0)); + TsKvEntry entry = entryList.get(0); + return Futures.transform(getSaveLatestFuture(entityId, entry), v -> entry, MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } @@ -212,7 +213,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.immediateFuture(result); } - protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestFuture = getFindLatestFuture(entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestFuture, tsKvEntry -> { @@ -233,12 +234,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.immediateFuture(null); }, service); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); Futures.addCallback(removedLatestFuture, new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { return getNewLatestEntryFuture(tenantId, entityId, query); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 3170f6c256..95be63dae7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -195,10 +195,10 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { + public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { validate(entityId); deleteTsKvQueries.forEach(BaseTimeseriesService::validate); - List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); + List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) { deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery); } @@ -206,9 +206,9 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { + public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); + List> futures = Lists.newArrayListWithExpectedSize(keys.size()); for (String key : keys) { DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); @@ -229,10 +229,10 @@ public class BaseTimeseriesService implements TimeseriesService { }, MoreExecutors.directExecutor()); } - private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { - futures.add(timeseriesDao.remove(tenantId, entityId, query)); + private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { + futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); - futures.add(timeseriesDao.removePartition(tenantId, entityId, query)); + futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); } private static void validate(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 0b4ae71e85..6e9bea0ecb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -114,7 +114,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes } @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestEntryFuture = findLatest(tenantId, entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> { @@ -134,12 +134,12 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Futures.immediateFuture(null); }, readResultsProcessingExecutor); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); Futures.addCallback(removedLatestFuture, new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { return getNewLatestEntryFuture(tenantId, entityId, query); } @@ -164,7 +164,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return resultFuture; } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { long startTs = 0; long endTs = query.getStartTs() - 1; ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, @@ -173,7 +173,8 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { - return saveLatest(tenantId, entityId, entryList.get(0)); + TsKvEntry entry = entryList.get(0); + return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> entry, MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index f19cc461dc..d0253db0bd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -32,7 +32,7 @@ public interface TimeseriesLatestDao { ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); - ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); + ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index a19d0fad0c..fe0b39c299 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -55,5 +55,5 @@ public interface RuleEngineTelemetryService { void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); - void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); + void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback callback); } diff --git a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts index 89a0c8f5b2..99907bcaa9 100644 --- a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts +++ b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts @@ -88,7 +88,15 @@ export class AttributeDatasource implements DataSource { fetchAttributes(entityId: EntityId, attributesScope: TelemetryType, pageLink: PageLink): Observable> { return this.getAllAttributes(entityId, attributesScope).pipe( - map((data) => pageLink.filterData(data)) + map((data) => { + let filteredData = []; + for(let key in filteredData) { + if(data[key]['value'] !== null) { + filteredData.push(data[key]); + } + } + return pageLink.filterData(filteredData); + }) ); }