From 54afdaa07a9593d975577ac5e386505f274918f2 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Thu, 6 Feb 2025 15:15:38 +0200 Subject: [PATCH 1/2] Do not copy latest to entity views when data was not saved on the main entity --- .../DefaultTelemetrySubscriptionService.java | 10 +++-- ...faultTelemetrySubscriptionServiceTest.java | 41 +++++++++++++++++++ .../server/common/data/EntityType.java | 12 ++++++ .../thingsboard/common/util/DonAsynchron.java | 6 +++ 4 files changed, 66 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index cab2f18dc7..43e4b66c40 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -151,7 +151,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); } if (strategy.saveLatest()) { - copyLatestToEntityViews(tenantId, entityId, request.getEntries()); + addMainCallback(saveFuture, __ -> copyLatestToEntityViews(tenantId, entityId, request.getEntries())); } return saveFuture; } @@ -207,8 +207,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List ts) { - if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { - Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), + if (entityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) { + Futures.addCallback(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback<>() { @Override public void onSuccess(@Nullable List result) { @@ -312,6 +312,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure); } + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess) { + DonAsynchron.withCallback(saveFuture, onSuccess, null, tsCallBackExecutor); + } + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess, Consumer onFailure) { DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor); } diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 10fdd85504..4ded639c76 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -71,6 +71,7 @@ import java.util.concurrent.ExecutorService; import java.util.stream.LongStream; import java.util.stream.Stream; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.given; @@ -318,6 +319,46 @@ class DefaultTelemetrySubscriptionServiceTest { then(subscriptionManagerService).shouldHaveNoInteractions(); } + @Test + void shouldNotCopyLatestToEntityViewWhenTimeseriesSaveFailedOnMainEntity() { + // GIVEN + var entityView = new EntityView(new EntityViewId(UUID.randomUUID())); + entityView.setTenantId(tenantId); + entityView.setCustomerId(customerId); + entityView.setEntityId(entityId); + entityView.setKeys(new TelemetryEntityView(sampleTelemetry.stream().map(KvEntry::getKey).toList(), new AttributesEntityView())); + + // mock that there is one entity view + lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(List.of(entityView))); + // mock that save latest call for entity view is successful + lenient().when(tsService.saveLatest(tenantId, entityView.getId(), sampleTelemetry)).thenReturn(immediateFuture(listOfNNumbers(sampleTelemetry.size()))); + // mock TPI for entity view + lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityView.getId())).thenReturn(tpi); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTelemetry) + .ttl(sampleTtl) + .strategy(new TimeseriesSaveRequest.Strategy(true, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.save(tenantId, entityId, sampleTelemetry, sampleTtl)).willReturn(immediateFailedFuture(new RuntimeException("failed to save latest on main entity"))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + // should save only time series for the main entity + then(tsService).should().save(tenantId, entityId, sampleTelemetry, sampleTtl); + then(tsService).shouldHaveNoMoreInteractions(); + + // should not send any WS updates + then(subscriptionManagerService).shouldHaveNoInteractions(); + } + @ParameterizedTest @MethodSource("booleanCombinations") void shouldCallCorrectApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java index 579fa863ba..84fc59022b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java @@ -84,4 +84,16 @@ public enum EntityType { this.tableName = tableName; } + public boolean isOneOf(EntityType... types) { + if (types == null) { + return false; + } + for (EntityType type : types) { + if (this == type) { + return true; + } + } + return false; + } + } diff --git a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java index f538d04f15..0e1193ef99 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java +++ b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java @@ -36,6 +36,9 @@ public class DonAsynchron { FutureCallback callback = new FutureCallback() { @Override public void onSuccess(T result) { + if (onSuccess == null) { + return; + } try { onSuccess.accept(result); } catch (Throwable th) { @@ -45,6 +48,9 @@ public class DonAsynchron { @Override public void onFailure(Throwable t) { + if (onFailure == null) { + return; + } onFailure.accept(t); } }; From b2b33199fea5fcf6940a2aca642931f332942376 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Thu, 6 Feb 2025 15:55:41 +0200 Subject: [PATCH 2/2] Corrections after code review --- .../DefaultTelemetrySubscriptionService.java | 94 +++++++++---------- ...faultTelemetrySubscriptionServiceTest.java | 2 +- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 43e4b66c40..5b5054ae38 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -150,7 +150,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (strategy.sendWsUpdate()) { addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); } - if (strategy.saveLatest()) { + if (strategy.saveLatest() && entityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) { addMainCallback(saveFuture, __ -> copyLatestToEntityViews(tenantId, entityId, request.getEntries())); } return saveFuture; @@ -207,58 +207,56 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List ts) { - if (entityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) { - Futures.addCallback(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List result) { - if (result != null && !result.isEmpty()) { - Map> tsMap = new HashMap<>(); - for (TsKvEntry entry : ts) { - tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry); + Futures.addCallback(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), + new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + if (result != null && !result.isEmpty()) { + Map> tsMap = new HashMap<>(); + for (TsKvEntry entry : ts) { + tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry); + } + for (EntityView entityView : result) { + List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? + entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet()); + List entityViewLatest = new ArrayList<>(); + long startTs = entityView.getStartTimeMs(); + long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); + for (String key : keys) { + List entries = tsMap.get(key); + if (entries != null) { + Optional tsKvEntry = entries.stream() + .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) + .max(Comparator.comparingLong(TsKvEntry::getTs)); + tsKvEntry.ifPresent(entityViewLatest::add); + } } - for (EntityView entityView : result) { - List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? - entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet()); - List entityViewLatest = new ArrayList<>(); - long startTs = entityView.getStartTimeMs(); - long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); - for (String key : keys) { - List entries = tsMap.get(key); - if (entries != null) { - Optional tsKvEntry = entries.stream() - .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) - .max(Comparator.comparingLong(TsKvEntry::getTs)); - tsKvEntry.ifPresent(entityViewLatest::add); - } - } - if (!entityViewLatest.isEmpty()) { - saveTimeseries(TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .entityId(entityView.getId()) - .entries(entityViewLatest) - .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) - .callback(new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) {} + if (!entityViewLatest.isEmpty()) { + saveTimeseries(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityView.getId()) + .entries(entityViewLatest) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) {} - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t); - } - }) - .build()); - } + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t); + } + }) + .build()); } } } + } - @Override - public void onFailure(Throwable t) { - log.error("Error while finding entity views by tenantId and entityId", t); - } - }, MoreExecutors.directExecutor()); - } + @Override + public void onFailure(Throwable t) { + log.error("Error while finding entity views by tenantId and entityId", t); + } + }, MoreExecutors.directExecutor()); } private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice) { @@ -313,7 +311,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess) { - DonAsynchron.withCallback(saveFuture, onSuccess, null, tsCallBackExecutor); + addMainCallback(saveFuture, onSuccess, null); } private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess, Consumer onFailure) { diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 4ded639c76..60a16c16a4 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -345,7 +345,7 @@ class DefaultTelemetrySubscriptionServiceTest { .callback(emptyCallback) .build(); - given(tsService.save(tenantId, entityId, sampleTelemetry, sampleTtl)).willReturn(immediateFailedFuture(new RuntimeException("failed to save latest on main entity"))); + given(tsService.save(tenantId, entityId, sampleTelemetry, sampleTtl)).willReturn(immediateFailedFuture(new RuntimeException("failed to save data on main entity"))); // WHEN telemetryService.saveTimeseries(request);