Merge PR #12487
This commit is contained in:
commit
ebfb51a59e
@ -42,6 +42,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
@ -127,7 +128,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
||||
if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
||||
KvUtils.validate(request.getEntries(), valueNoXssValidation);
|
||||
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
|
||||
ListenableFuture<TimeseriesSaveResult> future = saveTimeseriesInternal(request);
|
||||
if (!request.isOnlyLatest()) {
|
||||
Futures.addCallback(future, getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant), tsCallBackExecutor);
|
||||
}
|
||||
@ -137,32 +138,26 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request) {
|
||||
public ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request) {
|
||||
TenantId tenantId = request.getTenantId();
|
||||
EntityId entityId = request.getEntityId();
|
||||
ListenableFuture<Integer> saveFuture;
|
||||
ListenableFuture<TimeseriesSaveResult> resultFuture;
|
||||
if (request.isOnlyLatest()) {
|
||||
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
|
||||
resultFuture = tsService.saveLatest(tenantId, entityId, request.getEntries());
|
||||
} else if (request.isSaveLatest()) {
|
||||
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||
resultFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||
} else {
|
||||
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||
resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||
}
|
||||
// We need to guarantee, that the message is successfully pushed to the calculated fields service before we execute any callbacks.
|
||||
// saveFuture = Futures.transformAsync(saveFuture, new AsyncFunction<Integer, Integer>() {
|
||||
// @Override
|
||||
// public ListenableFuture<Integer> apply(Integer input) throws Exception {
|
||||
// calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(request));
|
||||
// return input;
|
||||
// }
|
||||
// });
|
||||
addMainCallback(saveFuture, request.getCallback());
|
||||
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
||||
addMainCallback(resultFuture, request.getCallback());
|
||||
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
||||
if (request.isSaveLatest() && !request.isOnlyLatest()) {
|
||||
addEntityViewCallback(tenantId, entityId, request.getEntries());
|
||||
}
|
||||
addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(request)), tsCallBackExecutor);
|
||||
return saveFuture;
|
||||
// Use something very similar to addMainCallback. don't forget about tsCallBackExecutor.
|
||||
//CalculatedFieldTimeSeriesUpdateRequest - add constructor that accepts the TimeseriesSaveRequest
|
||||
addCallback(resultFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getPreviousCalculatedFieldIds())), tsCallBackExecutor);
|
||||
return resultFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -333,18 +328,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
}
|
||||
}
|
||||
|
||||
private FutureCallback<Integer> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant) {
|
||||
private FutureCallback<TimeseriesSaveResult> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant) {
|
||||
return new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(Integer result) {
|
||||
if (!sysTenant && result != null && result > 0) {
|
||||
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
|
||||
public void onSuccess(TimeseriesSaveResult result) {
|
||||
Integer dataPoints = result.getDataPoints();
|
||||
if (!sysTenant && dataPoints != null && dataPoints > 0) {
|
||||
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, dataPoints);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -21,13 +21,14 @@ import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 27.03.18.
|
||||
*/
|
||||
public interface InternalTelemetryService extends RuleEngineTelemetryService {
|
||||
|
||||
ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request);
|
||||
ListenableFuture<TimeseriesSaveResult> saveTimeseriesInternal(TimeseriesSaveRequest request);
|
||||
|
||||
void saveAttributesInternal(AttributesSaveRequest request);
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||
|
||||
@ -44,13 +45,13 @@ public interface TimeseriesService {
|
||||
|
||||
ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId);
|
||||
|
||||
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
|
||||
ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
|
||||
|
||||
ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
||||
ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
||||
|
||||
ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
||||
ListenableFuture<TimeseriesSaveResult> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry, long ttl);
|
||||
|
||||
ListenableFuture<List<Long>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
|
||||
ListenableFuture<TimeseriesSaveResult> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries);
|
||||
|
||||
ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
|
||||
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.kv;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data(staticConstructor = "of")
|
||||
public class TimeseriesSaveResult {
|
||||
private final Integer dataPoints;
|
||||
private final List<Long> versions;
|
||||
}
|
||||
@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
@ -156,60 +157,48 @@ public class BaseTimeseriesService implements TimeseriesService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
||||
public ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
|
||||
validate(entityId);
|
||||
List<ListenableFuture<Integer>> futures = new ArrayList<>(INSERTS_PER_ENTRY);
|
||||
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, 0L);
|
||||
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
||||
return doSave(tenantId, entityId, List.of(tsKvEntry), 0L, true, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Integer> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||
return doSave(tenantId, entityId, tsKvEntries, ttl, true);
|
||||
public ListenableFuture<TimeseriesSaveResult> save(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||
return doSave(tenantId, entityId, tsKvEntries, ttl, true, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Integer> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||
return doSave(tenantId, entityId, tsKvEntries, ttl, false);
|
||||
}
|
||||
|
||||
private ListenableFuture<Integer> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest) {
|
||||
int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST;
|
||||
List<ListenableFuture<Integer>> futures = new ArrayList<>(tsKvEntries.size() * inserts);
|
||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
||||
if (saveLatest) {
|
||||
saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||
} else {
|
||||
saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||
}
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor());
|
||||
public ListenableFuture<TimeseriesSaveResult> saveWithoutLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl) {
|
||||
return doSave(tenantId, entityId, tsKvEntries, ttl, false, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<List<Long>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
|
||||
List<ListenableFuture<Long>> futures = new ArrayList<>(tsKvEntries.size());
|
||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
||||
futures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
|
||||
}
|
||||
return Futures.allAsList(futures);
|
||||
public ListenableFuture<TimeseriesSaveResult> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries) {
|
||||
return doSave(tenantId, entityId, tsKvEntries, 0L, true, false);
|
||||
}
|
||||
|
||||
private void saveAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||
futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor()));
|
||||
}
|
||||
|
||||
private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl);
|
||||
}
|
||||
|
||||
private void doSaveAndRegisterFuturesFor(TenantId tenantId, List<ListenableFuture<Integer>> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) {
|
||||
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
||||
private ListenableFuture<TimeseriesSaveResult> doSave(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntries, long ttl, boolean saveLatest, boolean saveTs) {
|
||||
if (saveTs && entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
|
||||
throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only");
|
||||
}
|
||||
futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
|
||||
futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
|
||||
List<ListenableFuture<Integer>> tsFutures = saveTs ? new ArrayList<>(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST) : null;
|
||||
List<ListenableFuture<Long>> latestFutures = saveLatest ? new ArrayList<>(tsKvEntries.size()) : null;
|
||||
for (TsKvEntry tsKvEntry : tsKvEntries) {
|
||||
if (saveTs) {
|
||||
tsFutures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey()));
|
||||
tsFutures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl));
|
||||
}
|
||||
if (saveLatest) {
|
||||
latestFutures.add(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry));
|
||||
}
|
||||
}
|
||||
ListenableFuture<Integer> dpsFuture = saveTs ? Futures.transform(Futures.allAsList(tsFutures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()) : Futures.immediateFuture(0);
|
||||
ListenableFuture<List<Long>> versionsFuture = saveLatest ? Futures.allAsList(latestFutures) : Futures.immediateFuture(null);
|
||||
return Futures.whenAllComplete(dpsFuture, versionsFuture).call(() -> {
|
||||
Integer dataPoints = Futures.getUnchecked(dpsFuture);
|
||||
List<Long> versions = Futures.getUnchecked(versionsFuture);
|
||||
return TimeseriesSaveResult.of(dataPoints, versions);
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
private List<ReadTsKvQuery> updateQueriesForEntityView(EntityView entityView, List<ReadTsKvQuery> queries) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user