diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index a3771988cf..df1421798e 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -140,6 +140,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } + @Override + public void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + checkInternalEntity(entityId); + boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; + if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { + saveWithoutLatestAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback() { + @Override + public void onSuccess(Integer result) { + if (!sysTenant && result != null && result > 0) { + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + } + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); + } else { + callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); + } + } + + private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + ListenableFuture saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl); + addMainCallback(saveFuture, callback); + addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); + // TODO: 12/11/2021 do we need entityView searching here? + } + @Override public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback) { saveAndNotifyInternal(tenantId, entityId, ts, 0L, callback); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index b1a2541fc7..4418a826f3 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -41,6 +41,8 @@ public interface TimeseriesService { ListenableFuture save(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); + ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntry, long ttl); + ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 3170f6c256..48cced688a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -56,6 +56,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; public class BaseTimeseriesService implements TimeseriesService { private static final int INSERTS_PER_ENTRY = 3; + private static final int INSERTS_PER_ENTRY_WITHOUT_LATEST = 2; private static final int DELETES_PER_ENTRY = INSERTS_PER_ENTRY; public static final Function, Integer> SUM_ALL_INTEGERS = new Function, Integer>() { @Override @@ -154,6 +155,18 @@ public class BaseTimeseriesService implements TimeseriesService { return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); } + @Override + public ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { + List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST); + for (TsKvEntry tsKvEntry : tsKvEntries) { + if (tsKvEntry == null) { + throw new IncorrectParameterException("Key value entry can't be null"); + } + saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); + } + return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); + } + @Override public ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntries) { List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size()); @@ -175,6 +188,14 @@ public class BaseTimeseriesService implements TimeseriesService { futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); } + private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { + if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { + throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); + } + futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey())); + futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); + } + private List updateQueriesForEntityView(EntityView entityView, List queries) { return queries.stream().map(query -> { long startTs; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index e641c83144..b2fd7d2a97 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -34,6 +34,8 @@ public interface RuleEngineTelemetryService { void saveAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List ts, long ttl, FutureCallback callback); + void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List ts, long ttl, FutureCallback callback); + void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback); void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, FutureCallback callback); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index c99a575cc3..dfc078b86a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit; nodeDescription = "Saves timeseries data", nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type", uiResources = {"static/rulenode/rulenode-core-config.js"}, - configDirective = "tbActionNodeTimeseriesConfig", + configDirective = "tbActionNodeCustomTimeseriesConfig", icon = "file_upload" ) public class TbMsgTimeseriesNode implements TbNode { @@ -94,7 +94,11 @@ public class TbMsgTimeseriesNode implements TbNode { if (ttl == 0L) { ttl = tenantProfileDefaultStorageTtl; } - ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); + if (config.isSkipLatestPersistence()) { + ctx.getTelemetryService().saveWithoutLatestAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); + } else { + ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); + } } public static long getTs(TbMsg msg) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java index fb63fbdbde..bb661df905 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeConfiguration.java @@ -22,11 +22,13 @@ import org.thingsboard.rule.engine.api.NodeConfiguration; public class TbMsgTimeseriesNodeConfiguration implements NodeConfiguration { private long defaultTTL; + private boolean skipLatestPersistence; @Override public TbMsgTimeseriesNodeConfiguration defaultConfiguration() { TbMsgTimeseriesNodeConfiguration configuration = new TbMsgTimeseriesNodeConfiguration(); configuration.setDefaultTTL(0L); + configuration.setSkipLatestPersistence(false); return configuration; } }