diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index f873e48774..e7dfbcdca9 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -126,10 +127,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { KvUtils.validate(request.getEntries(), valueNoXssValidation); - ListenableFuture future = saveTimeseriesInternal(request); + ListenableFuture future = saveTimeseriesInternal(request); if (!request.isOnlyLatest()) { - FutureCallback callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); - Futures.addCallback(future, callback, tsCallBackExecutor); + Futures.addCallback(future, getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant), tsCallBackExecutor); } } else { request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); @@ -137,27 +137,27 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request) { + public ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); - ListenableFuture saveFuture; + ListenableFuture resultFuture; if (request.isOnlyLatest()) { - saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor()); + resultFuture = tsService.saveLatest(tenantId, entityId, request.getEntries()); } else if (request.isSaveLatest()) { - saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl()); + resultFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl()); } else { - saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); + resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); } - addMainCallback(saveFuture, request.getCallback()); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); + addMainCallback(resultFuture, request.getCallback()); + addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); if (request.isSaveLatest() && !request.isOnlyLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } // Use something very similar to addMainCallback. don't forget about tsCallBackExecutor. //CalculatedFieldTimeSeriesUpdateRequest - add constructor that accepts the TimeseriesSaveRequest - addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor); - return saveFuture; + addCallback(resultFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor); + return resultFuture; } @Override @@ -329,19 +329,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private FutureCallback getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback callback) { + private FutureCallback getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant) { return new FutureCallback<>() { @Override - public void onSuccess(Integer result) { - if (!sysTenant && result != null && result > 0) { - apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + public void onSuccess(TimeseriesSaveResult result) { + Integer dataPoints = result.getDataPoints(); + if (!sysTenant && dataPoints != null && dataPoints > 0) { + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, dataPoints); } - callback.onSuccess(null); } @Override public void onFailure(Throwable t) { - callback.onFailure(t); } }; } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 8e45b84a75..8a236bf002 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -21,13 +21,14 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; /** * Created by ashvayka on 27.03.18. */ public interface InternalTelemetryService extends RuleEngineTelemetryService { - ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request); + ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request); void saveAttributesInternal(AttributesSaveRequest request); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 49918f3823..542f77445b 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; @@ -44,13 +45,13 @@ public interface TimeseriesService { ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId); - ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); + ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); - ListenableFuture save(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); + ListenableFuture save(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); - ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); + ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); - ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); + ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries); ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TimeseriesSaveResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TimeseriesSaveResult.java new file mode 100644 index 0000000000..edec063be2 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TimeseriesSaveResult.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.kv; + +import lombok.Data; + +import java.util.List; + +@Data(staticConstructor = "of") +public class TimeseriesSaveResult { + private final Integer dataPoints; + private final List versions; +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 756b73d88b..6b4a3c917e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.dao.entityview.EntityViewService; @@ -156,60 +157,48 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { validate(entityId); - List> futures = new ArrayList<>(INSERTS_PER_ENTRY); - saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L); - return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); + return doSave(tenantId, entityId, List.of(tsKvEntry), 0L, true, true); } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { - return doSave(tenantId, entityId, tsKvEntries, ttl, true); + public ListenableFuture save(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { + return doSave(tenantId, entityId, tsKvEntries, ttl, true, true); } @Override - public ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { - return doSave(tenantId, entityId, tsKvEntries, ttl, false); - } - - private ListenableFuture doSave(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl, boolean saveLatest) { - int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST; - List> futures = new ArrayList<>(tsKvEntries.size() * inserts); - for (TsKvEntry tsKvEntry : tsKvEntries) { - if (saveLatest) { - saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); - } else { - saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); - } - } - return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); + public ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { + return doSave(tenantId, entityId, tsKvEntries, ttl, false, true); } @Override - public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { - List> futures = new ArrayList<>(tsKvEntries.size()); - for (TsKvEntry tsKvEntry : tsKvEntries) { - futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); - } - return Futures.allAsList(futures); + public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { + return doSave(tenantId, entityId, tsKvEntries, 0L, true, false); } - private void saveAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { - doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl); - futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor())); - } - - private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { - doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl); - } - - private void doSaveAndRegisterFuturesFor(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl, boolean saveLatest, boolean saveTs) { + if (saveTs && entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); } - futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey())); - futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); + List> tsFutures = saveTs ? new ArrayList<>(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST) : null; + List> latestFutures = saveLatest ? new ArrayList<>(tsKvEntries.size()) : null; + for (TsKvEntry tsKvEntry : tsKvEntries) { + if (saveTs) { + tsFutures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey())); + tsFutures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); + } + if (saveLatest) { + latestFutures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry)); + } + } + ListenableFuture dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0); + ListenableFuture> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null); + return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> { + Integer dataPoints = Futures.getUnchecked(dpsFuture); + List versions = Futures.getUnchecked(versionsFuture); + return TimeseriesSaveResult.of(dataPoints, versions); + }, MoreExecutors.directExecutor()); } private List updateQueriesForEntityView(EntityView entityView, List queries) {