diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 2302446b6b..30f2885e78 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -177,8 +177,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (strategy.sendWsUpdate()) { addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); } - if (strategy.saveLatest()) { - copyLatestToEntityViews(tenantId, entityId, request.getEntries()); + if (strategy.saveLatest() && entityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) { + addMainCallback(resultFuture, __ -> copyLatestToEntityViews(tenantId, entityId, request.getEntries())); } return resultFuture; } @@ -333,58 +333,56 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List ts) { - if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { - Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), - new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List result) { - if (result != null && !result.isEmpty()) { - Map> tsMap = new HashMap<>(); - for (TsKvEntry entry : ts) { - tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry); + Futures.addCallback(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), + new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + if (result != null && !result.isEmpty()) { + Map> tsMap = new HashMap<>(); + for (TsKvEntry entry : ts) { + tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry); + } + for (EntityView entityView : result) { + List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? + entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet()); + List entityViewLatest = new ArrayList<>(); + long startTs = entityView.getStartTimeMs(); + long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); + for (String key : keys) { + List entries = tsMap.get(key); + if (entries != null) { + Optional tsKvEntry = entries.stream() + .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) + .max(comparingLong(TsKvEntry::getTs)); + tsKvEntry.ifPresent(entityViewLatest::add); + } } - for (EntityView entityView : result) { - List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? - entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet()); - List entityViewLatest = new ArrayList<>(); - long startTs = entityView.getStartTimeMs(); - long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); - for (String key : keys) { - List entries = tsMap.get(key); - if (entries != null) { - Optional tsKvEntry = entries.stream() - .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) - .max(comparingLong(TsKvEntry::getTs)); - tsKvEntry.ifPresent(entityViewLatest::add); - } - } - if (!entityViewLatest.isEmpty()) { - saveTimeseries(TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .entityId(entityView.getId()) - .entries(entityViewLatest) - .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) - .callback(new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) {} + if (!entityViewLatest.isEmpty()) { + saveTimeseries(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityView.getId()) + .entries(entityViewLatest) + .strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) {} - @Override - public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t); - } - }) - .build()); - } + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t); + } + }) + .build()); } } } + } - @Override - public void onFailure(Throwable t) { - log.error("Error while finding entity views by tenantId and entityId", t); - } - }, MoreExecutors.directExecutor()); - } + @Override + public void onFailure(Throwable t) { + log.error("Error while finding entity views by tenantId and entityId", t); + } + }, MoreExecutors.directExecutor()); } private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List attributes) { diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index 64845a12cd..2b4d9f38e5 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -359,6 +359,45 @@ class DefaultTelemetrySubscriptionServiceTest { then(subscriptionManagerService).shouldHaveNoInteractions(); } + @Test + void shouldNotCopyLatestToEntityViewWhenTimeseriesSaveFailedOnMainEntity() { + // GIVEN + var entityView = new EntityView(new EntityViewId(UUID.randomUUID())); + entityView.setTenantId(tenantId); + entityView.setCustomerId(customerId); + entityView.setEntityId(entityId); + entityView.setKeys(new TelemetryEntityView(sampleTimeseries.stream().map(KvEntry::getKey).toList(), new AttributesEntityView())); + + // mock that there is one entity view + lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(List.of(entityView))); + // mock that save latest call for entity view is successful + lenient().when(tsService.saveLatest(tenantId, entityView.getId(), sampleTimeseries)).thenReturn(immediateFuture(TimeseriesSaveResult.of(sampleTimeseries.size(), listOfNNumbers(sampleTimeseries.size())))); + // mock TPI for entity view + lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityView.getId())).thenReturn(tpi); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(entityId) + .entries(sampleTimeseries) + .ttl(sampleTtl) + .strategy(new TimeseriesSaveRequest.Strategy(true, true, false, false)) + .build(); + + given(tsService.save(tenantId, entityId, sampleTimeseries, sampleTtl)).willReturn(immediateFailedFuture(new RuntimeException("failed to save data on main entity"))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + // should save only time series for the main entity + then(tsService).should().save(tenantId, entityId, sampleTimeseries, sampleTtl); + then(tsService).shouldHaveNoMoreInteractions(); + + // should not send any WS updates + then(subscriptionManagerService).shouldHaveNoInteractions(); + } + @ParameterizedTest @MethodSource("allCombinationsOfFourBooleans") void shouldCallCorrectSaveTimeseriesApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java index 1c56aa25dd..93e754eb2c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EntityType.java @@ -97,4 +97,5 @@ public enum EntityType { } return false; } + }