From 6c0bca2baef4f5a0f03e04f0d12f63a1f6f90436 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 17 Dec 2024 12:25:57 +0200 Subject: [PATCH] Refactor saveAndNotifyInternal for timeseries; save latest by default --- .../controller/TelemetryController.java | 1 - .../DefaultTbApiUsageStateService.java | 33 +++++-- .../service/edge/rpc/EdgeGrpcService.java | 7 +- .../ota/DefaultOtaPackageStateService.java | 7 +- .../state/DefaultDeviceStateService.java | 23 +++-- .../DefaultRuleEngineStatisticsService.java | 22 ++++- .../csv/AbstractBulkImportService.java | 3 +- .../system/DefaultSystemInfoService.java | 13 ++- .../DefaultTelemetrySubscriptionService.java | 95 +++++++++---------- .../telemetry/InternalTelemetryService.java | 6 +- .../server/controller/WebsocketApiTest.java | 3 - .../engine/api/TimeseriesSaveRequest.java | 9 +- .../rule/engine/math/TbMathNode.java | 4 +- 13 files changed, 130 insertions(+), 96 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 8c63be6de9..f72b9684e2 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -679,7 +679,6 @@ public class TelemetryController extends BaseController { .entityId(entityId) .entries(entries) .ttl(tenantTtl) - .saveLatest(true) .callback(new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 60bf529ace..d9c5077ab7 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -27,6 +27,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.MailService; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.ApiFeature; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageRecordState; @@ -91,9 +92,9 @@ import java.util.stream.Collectors; public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService implements TbApiUsageStateService { public static final String HOURLY = "Hourly"; - public static final FutureCallback VOID_CALLBACK = new FutureCallback() { + public static final FutureCallback VOID_CALLBACK = new FutureCallback() { @Override - public void onSuccess(@Nullable Integer result) { + public void onSuccess(@Nullable Void result) { } @Override @@ -214,7 +215,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService updateLock.unlock(); } log.trace("[{}][{}] Saving new stats: {}", tenantId, ownerId, updatedEntries); - tsWsService.saveAndNotifyInternal(tenantId, usageState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); + tsWsService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(usageState.getApiUsageState().getId()) + .entries(updatedEntries) + .callback(VOID_CALLBACK) + .build()); if (!result.isEmpty()) { persistAndNotify(usageState, result); } @@ -321,7 +327,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } } if (!profileThresholds.isEmpty()) { - tsWsService.saveAndNotifyInternal(tenantId, id, profileThresholds, VOID_CALLBACK); + tsWsService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(id) + .entries(profileThresholds) + .callback(VOID_CALLBACK) + .build()); } } @@ -348,7 +359,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService long ts = System.currentTimeMillis(); List stateTelemetry = new ArrayList<>(); result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name())))); - tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK); + tsWsService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(state.getTenantId()) + .entityId(state.getApiUsageState().getId()) + .entries(stateTelemetry) + .callback(VOID_CALLBACK) + .build()); if (state.getEntityType() == EntityType.TENANT && !state.getEntityId().equals(TenantId.SYS_TENANT_ID)) { String email = tenantService.findTenantById(state.getTenantId()).getEmail(); @@ -436,7 +452,12 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService .map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L))) .collect(Collectors.toList()); - tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK); + tsWsService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(state.getTenantId()) + .entityId(state.getApiUsageState().getId()) + .entries(counts) + .callback(VOID_CALLBACK) + .build()); } BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId ownerId) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index d451bb36b3..dd2216dd5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -69,7 +69,6 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -507,8 +506,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i tsSubService.save(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) - .entries(Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value)))) - .saveLatest(true) + .entry(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))) .callback(new AttributeSaveCallback(tenantId, edgeId, key, value)) .build()); } else { @@ -522,8 +520,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i tsSubService.save(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) - .entries(Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value)))) - .saveLatest(true) + .entry(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))) .callback(new AttributeSaveCallback(tenantId, edgeId, key, value)) .build()); } else { diff --git a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java index 78da4eee7f..afae615333 100644 --- a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java @@ -55,7 +55,6 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -266,7 +265,6 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { .tenantId(tenantId) .entityId(deviceId) .entries(telemetry) - .saveLatest(true) .callback(new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { @@ -292,9 +290,8 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { telemetryService.save(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(deviceId) - .entries(Collections.singletonList(status)) - .saveLatest(true) - .callback(new FutureCallback() { + .entry(status) + .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { log.trace("[{}] Success save telemetry with target {} for device!", deviceId, otaPackage); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 6025e874b9..4f230f9d6b 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -38,6 +38,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.AttributeScope; @@ -866,10 +867,13 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, key, value)); + tsSubService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(TenantId.SYS_TENANT_ID) + .entityId(deviceId) + .entry(new BasicTsKvEntry(getCurrentTimeMillis(), new LongDataEntry(key, value))) + .ttl(telemetryTtl) + .callback(new TelemetrySaveCallback<>(deviceId, key, value)) + .build()); } else { tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value)); } @@ -877,10 +881,13 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, key, value)); + tsSubService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(TenantId.SYS_TENANT_ID) + .entityId(deviceId) + .entry(new BasicTsKvEntry(getCurrentTimeMillis(), new BooleanDataEntry(key, value))) + .ttl(telemetryTtl) + .callback(new TelemetrySaveCallback<>(deviceId, key, value)) + .build()); } else { tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, AttributeScope.SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value)); } diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java index e0c78e0afa..2a214388a1 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java @@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.id.QueueStatsId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -37,7 +38,6 @@ import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; -import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -53,9 +53,9 @@ import java.util.stream.Collectors; public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService { public static final String RULE_ENGINE_EXCEPTION = "ruleEngineException"; - public static final FutureCallback CALLBACK = new FutureCallback() { + public static final FutureCallback CALLBACK = new FutureCallback() { @Override - public void onSuccess(@Nullable Integer result) { + public void onSuccess(@Nullable Void result) { } @@ -89,7 +89,13 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS if (!tsList.isEmpty()) { long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getQueueStatsTtlDays); ttl = TimeUnit.DAYS.toSeconds(ttl); - tsService.saveAndNotifyInternal(tenantId, queueStatsId, tsList, ttl, CALLBACK); + tsService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(queueStatsId) + .entries(tsList) + .ttl(ttl) + .callback(CALLBACK) + .build()); } } } catch (Exception e) { @@ -103,7 +109,13 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS TsKvEntry tsKv = new BasicTsKvEntry(e.getTs(), new JsonDataEntry(RULE_ENGINE_EXCEPTION, e.toJsonString(maxErrorMessageLength))); long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getRuleEngineExceptionsTtlDays); ttl = TimeUnit.DAYS.toSeconds(ttl); - tsService.saveAndNotifyInternal(tenantId, getQueueStatsId(tenantId, queueName), Collections.singletonList(tsKv), ttl, CALLBACK); + tsService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(getQueueStatsId(tenantId, queueName)) + .entry(tsKv) + .ttl(ttl) + .callback(CALLBACK) + .build()); } catch (Exception e2) { if (!"Asset is referencing to non-existent tenant!".equalsIgnoreCase(e2.getMessage())) { log.debug("[{}] Failed to store the statistics", tenantId, e2); diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java index 9f7d4c0843..6d04c3173f 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java @@ -213,8 +213,7 @@ public abstract class AbstractBulkImportService() { + .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { entityActionService.logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, diff --git a/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java b/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java index 4016352f2d..b59c723996 100644 --- a/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java +++ b/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java @@ -27,6 +27,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.SmsService; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.FeaturesInfo; @@ -71,9 +72,9 @@ import static org.thingsboard.common.util.SystemUtil.getTotalMemory; @Slf4j public class DefaultSystemInfoService extends TbApplicationEventListener implements SystemInfoService { - public static final FutureCallback CALLBACK = new FutureCallback<>() { + public static final FutureCallback CALLBACK = new FutureCallback<>() { @Override - public void onSuccess(@Nullable Integer result) { + public void onSuccess(@Nullable Void result) { } @Override @@ -200,7 +201,13 @@ public class DefaultSystemInfoService extends TbApplicationEventListener telemetry) { ApiUsageState apiUsageState = apiUsageStateClient.getApiUsageState(TenantId.SYS_TENANT_ID); - telemetryService.saveAndNotifyInternal(TenantId.SYS_TENANT_ID, apiUsageState.getId(), telemetry, systemInfoTtlSeconds, CALLBACK); + telemetryService.saveInternal(TimeseriesSaveRequest.builder() + .tenantId(TenantId.SYS_TENANT_ID) + .entityId(apiUsageState.getId()) + .entries(telemetry) + .ttl(systemInfoTtlSeconds) + .callback(CALLBACK) + .build()); } private List getSystemData(ServiceInfo serviceInfo) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 682bddd436..1ce0550ee2 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -116,6 +116,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer super.shutdownExecutor(); } + @Override + public void save(TimeseriesSaveRequest request) { + TenantId tenantId = request.getTenantId(); + EntityId entityId = request.getEntityId(); + checkInternalEntity(entityId); + boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; + if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { + KvUtils.validate(request.getEntries(), valueNoXssValidation); + FutureCallback callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); + ListenableFuture future = saveInternal(request); + Futures.addCallback(future, callback, tsCallBackExecutor); + } else { + request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); + } + } + @Override public ListenableFuture saveAndNotify(TimeseriesSaveRequest request) { SettableFuture future = SettableFuture.create(); @@ -125,58 +141,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public void save(TimeseriesSaveRequest request) { + public ListenableFuture saveInternal(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); - checkInternalEntity(entityId); - boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; - if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { - KvUtils.validate(request.getEntries(), valueNoXssValidation); - FutureCallback callback = getCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); - if (request.isSaveLatest()) { - saveAndNotifyInternal(tenantId, entityId, request.getEntries(), request.getTtl(), callback); - } else { - saveWithoutLatestAndNotifyInternal(tenantId, entityId, request.getEntries(), request.getTtl(), callback); - } + ListenableFuture saveFuture; + if (request.isSaveLatest()) { + saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl()); } else { - request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); + saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); } - } - private FutureCallback getCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback callback) { - return new FutureCallback<>() { - @Override - public void onSuccess(Integer result) { - if (!sysTenant && result != null && result > 0) { - apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); - } - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }; - } - - @Override - public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback) { - saveAndNotifyInternal(tenantId, entityId, ts, 0L, callback); - } - - @Override - public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { - ListenableFuture saveFuture = tsService.save(tenantId, entityId, ts, ttl); - addMainCallback(saveFuture, callback); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); - addEntityViewCallback(tenantId, entityId, ts); - } - - private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { - ListenableFuture saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl); - addMainCallback(saveFuture, callback); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); + addMainCallback(saveFuture, request.getCallback()); + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); + if (request.isSaveLatest()) { + addEntityViewCallback(tenantId, entityId, request.getEntries()); + } + return saveFuture; } private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List ts) { @@ -452,11 +432,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }, tsCallBackExecutor); } - private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { + private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { Futures.addCallback(saveFuture, new FutureCallback() { @Override public void onSuccess(@Nullable S result) { - callback.onSuccess(result); + callback.onSuccess(null); } @Override @@ -472,6 +452,23 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } + private FutureCallback getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback callback) { + return new FutureCallback<>() { + @Override + public void onSuccess(Integer result) { + if (!sysTenant && result != null && result > 0) { + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + } + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }; + } + private static class VoidFutureCallback implements FutureCallback { private final SettableFuture future; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index a6dc6d1aae..c861efb46f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -16,7 +16,9 @@ package org.thingsboard.server.service.telemetry; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -30,9 +32,7 @@ import java.util.List; */ public interface InternalTelemetryService extends RuleEngineTelemetryService { - void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - - void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback); + ListenableFuture saveInternal(TimeseriesSaveRequest request); @Deprecated(since = "3.7.0") void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, FutureCallback callback); diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index e32d6cbe5a..40c7e435ca 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -807,11 +807,8 @@ public class WebsocketApiTest extends AbstractControllerTest { CountDownLatch latch = new CountDownLatch(1); tsService.save(TimeseriesSaveRequest.builder() .tenantId(device.getTenantId()) - .customerId(null) .entityId(device.getId()) .entries(tsData) - .ttl(0) - .saveLatest(true) .callback(new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 042915c95b..bcf021d3ca 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -18,7 +18,8 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; import lombok.AccessLevel; import lombok.AllArgsConstructor; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -26,15 +27,17 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; -@Data +@Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) public class TimeseriesSaveRequest { + private final TenantId tenantId; private final CustomerId customerId; private final EntityId entityId; private final List entries; private final long ttl; private final boolean saveLatest; + @Setter private FutureCallback callback; public static Builder builder() { @@ -49,7 +52,7 @@ public class TimeseriesSaveRequest { private List entries; private long ttl; private FutureCallback callback; - private boolean saveLatest; + private boolean saveLatest = true; Builder() {} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index 83d82534a7..893b06bfec 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -43,7 +43,6 @@ import org.thingsboard.server.common.msg.TbMsg; import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentMap; @@ -145,8 +144,7 @@ public class TbMathNode implements TbNode { return ctx.getTelemetryService().saveAndNotify(TimeseriesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(msg.getOriginator()) - .entries(Collections.singletonList(basicTsKvEntry)) - .saveLatest(true) + .entry(basicTsKvEntry) .build()); }