Refactor saveAndNotifyInternal for timeseries; save latest by default

This commit is contained in:
ViacheslavKlimov 2024-12-17 12:25:57 +02:00
parent 2bb65923dc
commit 6c0bca2bae
13 changed files with 130 additions and 96 deletions

View File

@ -679,7 +679,6 @@ public class TelemetryController extends BaseController {
.entityId(entityId)
.entries(entries)
.ttl(tenantTtl)
.saveLatest(true)
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {

View File

@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.ApiFeature;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageRecordState;
@ -91,9 +92,9 @@ import java.util.stream.Collectors;
public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService<EntityId> implements TbApiUsageStateService {
public static final String HOURLY = "Hourly";
public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
public static final FutureCallback<Void> VOID_CALLBACK = new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Integer result) {
public void onSuccess(@Nullable Void result) {
}
@Override
@ -214,7 +215,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
updateLock.unlock();
}
log.trace("[{}][{}] Saving new stats: {}", tenantId, ownerId, updatedEntries);
tsWsService.saveAndNotifyInternal(tenantId, usageState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK);
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(usageState.getApiUsageState().getId())
.entries(updatedEntries)
.callback(VOID_CALLBACK)
.build());
if (!result.isEmpty()) {
persistAndNotify(usageState, result);
}
@ -321,7 +327,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
}
}
if (!profileThresholds.isEmpty()) {
tsWsService.saveAndNotifyInternal(tenantId, id, profileThresholds, VOID_CALLBACK);
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(id)
.entries(profileThresholds)
.callback(VOID_CALLBACK)
.build());
}
}
@ -348,7 +359,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
long ts = System.currentTimeMillis();
List<TsKvEntry> stateTelemetry = new ArrayList<>();
result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name()))));
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK);
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(state.getTenantId())
.entityId(state.getApiUsageState().getId())
.entries(stateTelemetry)
.callback(VOID_CALLBACK)
.build());
if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) {
String email = tenantService.findTenantById(state.getTenantId()).getEmail();
@ -436,7 +452,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
.map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L)))
.collect(Collectors.toList());
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK);
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(state.getTenantId())
.entityId(state.getApiUsageState().getId())
.entries(counts)
.callback(VOID_CALLBACK)
.build());
}
BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId ownerId) {

View File

@ -69,7 +69,6 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -507,8 +506,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
tsSubService.save(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.entries(Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))))
.saveLatest(true)
.entry(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value)))
.callback(new AttributeSaveCallback(tenantId, edgeId, key, value))
.build());
} else {
@ -522,8 +520,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
tsSubService.save(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.entries(Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))))
.saveLatest(true)
.entry(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value)))
.callback(new AttributeSaveCallback(tenantId, edgeId, key, value))
.build());
} else {

View File

@ -55,7 +55,6 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -266,7 +265,6 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
.tenantId(tenantId)
.entityId(deviceId)
.entries(telemetry)
.saveLatest(true)
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
@ -292,9 +290,8 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
telemetryService.save(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.entries(Collections.singletonList(status))
.saveLatest(true)
.callback(new FutureCallback<Void>() {
.entry(status)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
log.trace("[{}] Success save telemetry with target {} for device!", deviceId, otaPackage);

View File

@ -38,6 +38,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.AttributeScope;
@ -866,10 +867,13 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private void save(DeviceId deviceId, String key, long value) {
if (persistToTelemetry) {
tsSubService.saveAndNotifyInternal(
TenantId.SYS_TENANT_ID, deviceId,
Collections.singletonList(new BasicTsKvEntry(getCurrentTimeMillis(), new LongDataEntry(key, value))),
telemetryTtl, new TelemetrySaveCallback<>(deviceId, key, value));
tsSubService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(TenantId.SYS_TENANT_ID)
.entityId(deviceId)
.entry(new BasicTsKvEntry(getCurrentTimeMillis(), new LongDataEntry(key, value)))
.ttl(telemetryTtl)
.callback(new TelemetrySaveCallback<>(deviceId, key, value))
.build());
} else {
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
}
@ -877,10 +881,13 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private void save(DeviceId deviceId, String key, boolean value) {
if (persistToTelemetry) {
tsSubService.saveAndNotifyInternal(
TenantId.SYS_TENANT_ID, deviceId,
Collections.singletonList(new BasicTsKvEntry(getCurrentTimeMillis(), new BooleanDataEntry(key, value))),
telemetryTtl, new TelemetrySaveCallback<>(deviceId, key, value));
tsSubService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(TenantId.SYS_TENANT_ID)
.entityId(deviceId)
.entry(new BasicTsKvEntry(getCurrentTimeMillis(), new BooleanDataEntry(key, value)))
.ttl(telemetryTtl)
.callback(new TelemetrySaveCallback<>(deviceId, key, value))
.build());
} else {
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
}

View File

@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.id.QueueStatsId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
@ -37,7 +38,6 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -53,9 +53,9 @@ import java.util.stream.Collectors;
public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService {
public static final String RULE_ENGINE_EXCEPTION = "ruleEngineException";
public static final FutureCallback<Integer> CALLBACK = new FutureCallback<Integer>() {
public static final FutureCallback<Void> CALLBACK = new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Integer result) {
public void onSuccess(@Nullable Void result) {
}
@ -89,7 +89,13 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
if (!tsList.isEmpty()) {
long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getQueueStatsTtlDays);
ttl = TimeUnit.DAYS.toSeconds(ttl);
tsService.saveAndNotifyInternal(tenantId, queueStatsId, tsList, ttl, CALLBACK);
tsService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(queueStatsId)
.entries(tsList)
.ttl(ttl)
.callback(CALLBACK)
.build());
}
}
} catch (Exception e) {
@ -103,7 +109,13 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
TsKvEntry tsKv = new BasicTsKvEntry(e.getTs(), new JsonDataEntry(RULE_ENGINE_EXCEPTION, e.toJsonString(maxErrorMessageLength)));
long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getRuleEngineExceptionsTtlDays);
ttl = TimeUnit.DAYS.toSeconds(ttl);
tsService.saveAndNotifyInternal(tenantId, getQueueStatsId(tenantId, queueName), Collections.singletonList(tsKv), ttl, CALLBACK);
tsService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(getQueueStatsId(tenantId, queueName))
.entry(tsKv)
.ttl(ttl)
.callback(CALLBACK)
.build());
} catch (Exception e2) {
if (!"Asset is referencing to non-existent tenant!".equalsIgnoreCase(e2.getMessage())) {
log.debug("[{}] Failed to store the statistics", tenantId, e2);

View File

@ -213,8 +213,7 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
.entityId(entityId)
.entries(timeseries)
.ttl(tenantTtl)
.saveLatest(true)
.callback(new FutureCallback<Void>() {
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, null,

View File

@ -27,6 +27,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.FeaturesInfo;
@ -71,9 +72,9 @@ import static org.thingsboard.common.util.SystemUtil.getTotalMemory;
@Slf4j
public class DefaultSystemInfoService extends TbApplicationEventListener<PartitionChangeEvent> implements SystemInfoService {
public static final FutureCallback<Integer> CALLBACK = new FutureCallback<>() {
public static final FutureCallback<Void> CALLBACK = new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Integer result) {
public void onSuccess(@Nullable Void result) {
}
@Override
@ -200,7 +201,13 @@ public class DefaultSystemInfoService extends TbApplicationEventListener<Partiti
private void doSave(List<TsKvEntry> telemetry) {
ApiUsageState apiUsageState = apiUsageStateClient.getApiUsageState(TenantId.SYS_TENANT_ID);
telemetryService.saveAndNotifyInternal(TenantId.SYS_TENANT_ID, apiUsageState.getId(), telemetry, systemInfoTtlSeconds, CALLBACK);
telemetryService.saveInternal(TimeseriesSaveRequest.builder()
.tenantId(TenantId.SYS_TENANT_ID)
.entityId(apiUsageState.getId())
.entries(telemetry)
.ttl(systemInfoTtlSeconds)
.callback(CALLBACK)
.build());
}
private List<SystemInfoData> getSystemData(ServiceInfo serviceInfo) {

View File

@ -116,6 +116,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
super.shutdownExecutor();
}
@Override
public void save(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
checkInternalEntity(entityId);
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
KvUtils.validate(request.getEntries(), valueNoXssValidation);
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
ListenableFuture<Integer> future = saveInternal(request);
Futures.addCallback(future, callback, tsCallBackExecutor);
} else {
request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
}
}
@Override
public ListenableFuture<Void> saveAndNotify(TimeseriesSaveRequest request) {
SettableFuture<Void> future = SettableFuture.create();
@ -125,58 +141,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
@Override
public void save(TimeseriesSaveRequest request) {
public ListenableFuture<Integer> saveInternal(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
checkInternalEntity(entityId);
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
KvUtils.validate(request.getEntries(), valueNoXssValidation);
FutureCallback<Integer> callback = getCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
ListenableFuture<Integer> saveFuture;
if (request.isSaveLatest()) {
saveAndNotifyInternal(tenantId, entityId, request.getEntries(), request.getTtl(), callback);
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
} else {
saveWithoutLatestAndNotifyInternal(tenantId, entityId, request.getEntries(), request.getTtl(), callback);
}
} else {
request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
}
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
}
private FutureCallback<Integer> getCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback<Void> callback) {
return new FutureCallback<>() {
@Override
public void onSuccess(Integer result) {
if (!sysTenant && result != null && result > 0) {
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest()) {
addEntityViewCallback(tenantId, entityId, request.getEntries());
}
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
};
}
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Integer> callback) {
saveAndNotifyInternal(tenantId, entityId, ts, 0L, callback);
}
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
ListenableFuture<Integer> saveFuture = tsService.save(tenantId, entityId, ts, ttl);
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
addEntityViewCallback(tenantId, entityId, ts);
}
private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback) {
ListenableFuture<Integer> saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl);
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
return saveFuture;
}
private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
@ -452,11 +432,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}, tsCallBackExecutor);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<S> callback) {
private <S> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override
public void onSuccess(@Nullable S result) {
callback.onSuccess(result);
callback.onSuccess(null);
}
@Override
@ -472,6 +452,23 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private FutureCallback<Integer> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback<Void> callback) {
return new FutureCallback<>() {
@Override
public void onSuccess(Integer result) {
if (!sysTenant && result != null && result > 0) {
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
}
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
};
}
private static class VoidFutureCallback implements FutureCallback<Void> {
private final SettableFuture<Void> future;

View File

@ -16,7 +16,9 @@
package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -30,9 +32,7 @@ import java.util.List;
*/
public interface InternalTelemetryService extends RuleEngineTelemetryService {
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Integer> callback);
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Integer> callback);
ListenableFuture<Integer> saveInternal(TimeseriesSaveRequest request);
@Deprecated(since = "3.7.0")
void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);

View File

@ -807,11 +807,8 @@ public class WebsocketApiTest extends AbstractControllerTest {
CountDownLatch latch = new CountDownLatch(1);
tsService.save(TimeseriesSaveRequest.builder()
.tenantId(device.getTenantId())
.customerId(null)
.entityId(device.getId())
.entries(tsData)
.ttl(0)
.saveLatest(true)
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {

View File

@ -18,7 +18,8 @@ package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -26,15 +27,17 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
@Data
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class TimeseriesSaveRequest {
private final TenantId tenantId;
private final CustomerId customerId;
private final EntityId entityId;
private final List<TsKvEntry> entries;
private final long ttl;
private final boolean saveLatest;
@Setter
private FutureCallback<Void> callback;
public static Builder builder() {
@ -49,7 +52,7 @@ public class TimeseriesSaveRequest {
private List<TsKvEntry> entries;
private long ttl;
private FutureCallback<Void> callback;
private boolean saveLatest;
private boolean saveLatest = true;
Builder() {}

View File

@ -43,7 +43,6 @@ import org.thingsboard.server.common.msg.TbMsg;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
@ -145,8 +144,7 @@ public class TbMathNode implements TbNode {
return ctx.getTelemetryService().saveAndNotify(TimeseriesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.entries(Collections.singletonList(basicTsKvEntry))
.saveLatest(true)
.entry(basicTsKvEntry)
.build());
}