Return TimeseriesSaveResult
This commit is contained in:
parent
7d8a76ce7f
commit
0f34f131c9
@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
|
|||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
|
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
@ -126,10 +127,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
||||||
if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
||||||
KvUtils.validate(request.getEntries(), valueNoXssValidation);
|
KvUtils.validate(request.getEntries(), valueNoXssValidation);
|
||||||
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
|
ListenableFuture<TimeseriesSaveResult> future = saveTimeseriesInternal(request);
|
||||||
if (!request.isOnlyLatest()) {
|
if (!request.isOnlyLatest()) {
|
||||||
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
|
Futures.addCallback(future, getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant), tsCallBackExecutor);
|
||||||
Futures.addCallback(future, callback, tsCallBackExecutor);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
|
request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
|
||||||
@ -137,27 +137,27 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request) {
|
public ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request) {
|
||||||
TenantId tenantId = request.getTenantId();
|
TenantId tenantId = request.getTenantId();
|
||||||
EntityId entityId = request.getEntityId();
|
EntityId entityId = request.getEntityId();
|
||||||
ListenableFuture<Integer> saveFuture;
|
ListenableFuture<TimeseriesSaveResult> resultFuture;
|
||||||
if (request.isOnlyLatest()) {
|
if (request.isOnlyLatest()) {
|
||||||
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
|
resultFuture = tsService.saveLatest(tenantId, entityId, request.getEntries());
|
||||||
} else if (request.isSaveLatest()) {
|
} else if (request.isSaveLatest()) {
|
||||||
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
|
resultFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||||
} else {
|
} else {
|
||||||
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
|
resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||||
}
|
}
|
||||||
|
|
||||||
addMainCallback(saveFuture, request.getCallback());
|
addMainCallback(resultFuture, request.getCallback());
|
||||||
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
||||||
if (request.isSaveLatest() && !request.isOnlyLatest()) {
|
if (request.isSaveLatest() && !request.isOnlyLatest()) {
|
||||||
addEntityViewCallback(tenantId, entityId, request.getEntries());
|
addEntityViewCallback(tenantId, entityId, request.getEntries());
|
||||||
}
|
}
|
||||||
// Use something very similar to addMainCallback. don't forget about tsCallBackExecutor.
|
// Use something very similar to addMainCallback. don't forget about tsCallBackExecutor.
|
||||||
//CalculatedFieldTimeSeriesUpdateRequest - add constructor that accepts the TimeseriesSaveRequest
|
//CalculatedFieldTimeSeriesUpdateRequest - add constructor that accepts the TimeseriesSaveRequest
|
||||||
addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor);
|
addCallback(resultFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor);
|
||||||
return saveFuture;
|
return resultFuture;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -329,19 +329,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private FutureCallback<Integer> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback<Void> callback) {
|
private FutureCallback<TimeseriesSaveResult> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant) {
|
||||||
return new FutureCallback<>() {
|
return new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Integer result) {
|
public void onSuccess(TimeseriesSaveResult result) {
|
||||||
if (!sysTenant && result != null && result > 0) {
|
Integer dataPoints = result.getDataPoints();
|
||||||
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
|
if (!sysTenant && dataPoints != null && dataPoints > 0) {
|
||||||
|
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, dataPoints);
|
||||||
}
|
}
|
||||||
callback.onSuccess(null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
callback.onFailure(t);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,13 +21,14 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
|||||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
||||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
|
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
|
||||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
||||||
|
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by ashvayka on 27.03.18.
|
* Created by ashvayka on 27.03.18.
|
||||||
*/
|
*/
|
||||||
public interface InternalTelemetryService extends RuleEngineTelemetryService {
|
public interface InternalTelemetryService extends RuleEngineTelemetryService {
|
||||||
|
|
||||||
ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request);
|
ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request);
|
||||||
|
|
||||||
void saveAttributesInternal(AttributesSaveRequest request);
|
void saveAttributesInternal(AttributesSaveRequest request);
|
||||||
|
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId;
|
|||||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||||
|
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
|
|
||||||
@ -44,13 +45,13 @@ public interface TimeseriesService {
|
|||||||
|
|
||||||
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
|
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
|
||||||
|
|
||||||
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
|
ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
|
||||||
|
|
||||||
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
||||||
|
|
||||||
ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
ListenableFuture<TimeseriesSaveResult> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
||||||
|
|
||||||
ListenableFuture<List<Long>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
|
ListenableFuture<TimeseriesSaveResult> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries);
|
||||||
|
|
||||||
ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
|
ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,26 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.common.data.kv;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Data(staticConstructor = "of")
|
||||||
|
public class TimeseriesSaveResult {
|
||||||
|
private final Integer dataPoints;
|
||||||
|
private final List<Long> versions;
|
||||||
|
}
|
||||||
@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
|||||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||||
|
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||||
@ -156,60 +157,48 @@ public class BaseTimeseriesService implements TimeseriesService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
public ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
||||||
validate(entityId);
|
validate(entityId);
|
||||||
List<ListenableFuture<Integer>> futures = new ArrayList<>(INSERTS_PER_ENTRY);
|
return doSave(tenantId, entityId, List.of(tsKvEntry), 0L, true, true);
|
||||||
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L);
|
|
||||||
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
public ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||||
return doSave(tenantId, entityId, tsKvEntries, ttl, true);
|
return doSave(tenantId, entityId, tsKvEntries, ttl, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
public ListenableFuture<TimeseriesSaveResult> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||||
return doSave(tenantId, entityId, tsKvEntries, ttl, false);
|
return doSave(tenantId, entityId, tsKvEntries, ttl, false, true);
|
||||||
}
|
|
||||||
|
|
||||||
private ListenableFuture<Integer> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest) {
|
|
||||||
int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST;
|
|
||||||
List<ListenableFuture<Integer>> futures = new ArrayList<>(tsKvEntries.size() * inserts);
|
|
||||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
|
||||||
if (saveLatest) {
|
|
||||||
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
|
||||||
} else {
|
|
||||||
saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<Long>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
|
public ListenableFuture<TimeseriesSaveResult> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
|
||||||
List<ListenableFuture<Long>> futures = new ArrayList<>(tsKvEntries.size());
|
return doSave(tenantId, entityId, tsKvEntries, 0L, true, false);
|
||||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
|
||||||
futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
|
|
||||||
}
|
|
||||||
return Futures.allAsList(futures);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
private ListenableFuture<TimeseriesSaveResult> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest, boolean saveTs) {
|
||||||
doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl);
|
if (saveTs && entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
||||||
futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
|
||||||
doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doSaveAndRegisterFuturesFor(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
|
||||||
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
|
||||||
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
||||||
}
|
}
|
||||||
futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
|
List<ListenableFuture<Integer>> tsFutures = saveTs ? new ArrayList<>(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST) : null;
|
||||||
futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
|
List<ListenableFuture<Long>> latestFutures = saveLatest ? new ArrayList<>(tsKvEntries.size()) : null;
|
||||||
|
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
||||||
|
if (saveTs) {
|
||||||
|
tsFutures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
|
||||||
|
tsFutures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
|
||||||
|
}
|
||||||
|
if (saveLatest) {
|
||||||
|
latestFutures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ListenableFuture<Integer> dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
|
||||||
|
ListenableFuture<List<Long>> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null);
|
||||||
|
return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> {
|
||||||
|
Integer dataPoints = Futures.getUnchecked(dpsFuture);
|
||||||
|
List<Long> versions = Futures.getUnchecked(versionsFuture);
|
||||||
|
return TimeseriesSaveResult.of(dataPoints, versions);
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ReadTsKvQuery> updateQueriesForEntityView(EntityView entityView, List<ReadTsKvQuery> queries) {
|
private List<ReadTsKvQuery> updateQueriesForEntityView(EntityView entityView, List<ReadTsKvQuery> queries) {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user