From fbdbea7f598899d2da21975712f7a0df183ea956 Mon Sep 17 00:00:00 2001 From: Dima Landiak Date: Fri, 12 Nov 2021 14:09:39 +0200 Subject: [PATCH] refactoring --- .../DefaultTelemetrySubscriptionService.java | 77 +++++++++---------- .../dao/timeseries/BaseTimeseriesService.java | 32 ++++---- .../engine/telemetry/TbMsgTimeseriesNode.java | 9 ++- 3 files changed, 59 insertions(+), 59 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index df1421798e..232f8bfd00 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -20,8 +20,10 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; @@ -45,7 +47,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import javax.annotation.Nullable; @@ -118,57 +119,44 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void saveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List ts, long ttl, FutureCallback callback) { - checkInternalEntity(entityId); - boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; - if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { - saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback() { - @Override - public void onSuccess(Integer result) { - if (!sysTenant && result != null && result > 0) { - apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); - } - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }); - } else { - callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); - } + doSaveAndNotify(tenantId, customerId, entityId, ts, ttl, callback, true); } @Override public void saveWithoutLatestAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + doSaveAndNotify(tenantId, customerId, entityId, ts, ttl, callback, false); + } + + private void doSaveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List ts, long ttl, FutureCallback callback, boolean saveLatest) { checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { - saveWithoutLatestAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback() { - @Override - public void onSuccess(Integer result) { - if (!sysTenant && result != null && result > 0) { - apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); - } - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }); + if (saveLatest) { + saveAndNotifyInternal(tenantId, entityId, ts, ttl, getCallback(tenantId, customerId, sysTenant, callback)); + } else { + saveWithoutLatestAndNotifyInternal(tenantId, entityId, ts, ttl, getCallback(tenantId, customerId, sysTenant, callback)); + } } else { callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); } } - private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { - ListenableFuture saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl); - addMainCallback(saveFuture, callback); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); - // TODO: 12/11/2021 do we need entityView searching here? + @NotNull + private FutureCallback getCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback callback) { + return new FutureCallback<>() { + @Override + public void onSuccess(Integer result) { + if (!sysTenant && result != null && result > 0) { + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + } + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }; } @Override @@ -179,6 +167,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { ListenableFuture saveFuture = tsService.save(tenantId, entityId, ts, ttl); + addCallbacks(tenantId, entityId, ts, callback, saveFuture); + } + + private void saveWithoutLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + ListenableFuture saveFuture = tsService.saveWithoutLatest(tenantId, entityId, ts, ttl); + addCallbacks(tenantId, entityId, ts, callback, saveFuture); + } + + private void addCallbacks(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback, ListenableFuture saveFuture) { addMainCallback(saveFuture, callback); addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 48cced688a..da3fe05024 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -145,24 +145,26 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { - List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY); - for (TsKvEntry tsKvEntry : tsKvEntries) { - if (tsKvEntry == null) { - throw new IncorrectParameterException("Key value entry can't be null"); - } - saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); - } - return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); + return doSave(tenantId, entityId, tsKvEntries, ttl, true); } @Override public ListenableFuture saveWithoutLatest(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl) { - List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * INSERTS_PER_ENTRY_WITHOUT_LATEST); + return doSave(tenantId, entityId, tsKvEntries, ttl, false); + } + + private ListenableFuture doSave(TenantId tenantId, EntityId entityId, List tsKvEntries, long ttl, boolean saveLatest) { + int inserts = saveLatest ? INSERTS_PER_ENTRY : INSERTS_PER_ENTRY_WITHOUT_LATEST; + List> futures = Lists.newArrayListWithExpectedSize(tsKvEntries.size() * inserts); for (TsKvEntry tsKvEntry : tsKvEntries) { if (tsKvEntry == null) { throw new IncorrectParameterException("Key value entry can't be null"); } - saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); + if (saveLatest) { + saveAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); + } else { + saveWithoutLatestAndRegisterFutures(tenantId, futures, entityId, tsKvEntry, ttl); + } } return Futures.transform(Futures.allAsList(futures), SUM_ALL_INTEGERS, MoreExecutors.directExecutor()); } @@ -180,15 +182,15 @@ public class BaseTimeseriesService implements TimeseriesService { } private void saveAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { - throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); - } - futures.add(timeseriesDao.savePartition(tenantId, entityId, tsKvEntry.getTs(), tsKvEntry.getKey())); + doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl); futures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), v -> 0, MoreExecutors.directExecutor())); - futures.add(timeseriesDao.save(tenantId, entityId, tsKvEntry, ttl)); } private void saveWithoutLatestAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { + doSaveAndRegisterFuturesFor(tenantId, futures, entityId, tsKvEntry, ttl); + } + + private void doSaveAndRegisterFuturesFor(TenantId tenantId, List> futures, EntityId entityId, TsKvEntry tsKvEntry, long ttl) { if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { throw new IncorrectParameterException("Telemetry data can't be stored for entity view. Read only"); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index dfc078b86a..c5f978841a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -23,9 +23,8 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -46,7 +45,9 @@ import java.util.concurrent.TimeUnit; name = "save timeseries", configClazz = TbMsgTimeseriesNodeConfiguration.class, nodeDescription = "Saves timeseries data", - nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type", + nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY_REQUEST' message type. " + + "Timestamp in milliseconds will be taken from metadata.ts, otherwise 'now' timestamp will be applied. " + + "Allows stopping updating values for incoming keys in the latest ts_kv table if 'skipLatestPersistence' is set to true.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeCustomTimeseriesConfig", icon = "file_upload" @@ -107,7 +108,7 @@ public class TbMsgTimeseriesNode implements TbNode { if (!StringUtils.isEmpty(tsStr)) { try { ts = Long.parseLong(tsStr); - } catch (NumberFormatException e) { + } catch (NumberFormatException ignored) { } } else { ts = msg.getTs();