Merge pull request #12611 from dskarzh/copy-latest-to-view-fix
Do not copy latest to entity views when data was not saved on the main entity
This commit is contained in:
commit
ef0a0a2d50
@ -177,8 +177,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
if (strategy.sendWsUpdate()) {
|
if (strategy.sendWsUpdate()) {
|
||||||
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
||||||
}
|
}
|
||||||
if (strategy.saveLatest()) {
|
if (strategy.saveLatest() && entityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) {
|
||||||
copyLatestToEntityViews(tenantId, entityId, request.getEntries());
|
addMainCallback(resultFuture, __ -> copyLatestToEntityViews(tenantId, entityId, request.getEntries()));
|
||||||
}
|
}
|
||||||
return resultFuture;
|
return resultFuture;
|
||||||
}
|
}
|
||||||
@ -333,58 +333,56 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
|
private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
|
||||||
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
|
Futures.addCallback(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
|
||||||
Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
|
new FutureCallback<>() {
|
||||||
new FutureCallback<>() {
|
@Override
|
||||||
@Override
|
public void onSuccess(@Nullable List<EntityView> result) {
|
||||||
public void onSuccess(@Nullable List<EntityView> result) {
|
if (result != null && !result.isEmpty()) {
|
||||||
if (result != null && !result.isEmpty()) {
|
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
|
||||||
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
|
for (TsKvEntry entry : ts) {
|
||||||
for (TsKvEntry entry : ts) {
|
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
|
||||||
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
|
}
|
||||||
|
for (EntityView entityView : result) {
|
||||||
|
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
|
||||||
|
entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet());
|
||||||
|
List<TsKvEntry> entityViewLatest = new ArrayList<>();
|
||||||
|
long startTs = entityView.getStartTimeMs();
|
||||||
|
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
|
||||||
|
for (String key : keys) {
|
||||||
|
List<TsKvEntry> entries = tsMap.get(key);
|
||||||
|
if (entries != null) {
|
||||||
|
Optional<TsKvEntry> tsKvEntry = entries.stream()
|
||||||
|
.filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
|
||||||
|
.max(comparingLong(TsKvEntry::getTs));
|
||||||
|
tsKvEntry.ifPresent(entityViewLatest::add);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (EntityView entityView : result) {
|
if (!entityViewLatest.isEmpty()) {
|
||||||
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
|
saveTimeseries(TimeseriesSaveRequest.builder()
|
||||||
entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet());
|
.tenantId(tenantId)
|
||||||
List<TsKvEntry> entityViewLatest = new ArrayList<>();
|
.entityId(entityView.getId())
|
||||||
long startTs = entityView.getStartTimeMs();
|
.entries(entityViewLatest)
|
||||||
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
|
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
|
||||||
for (String key : keys) {
|
.callback(new FutureCallback<>() {
|
||||||
List<TsKvEntry> entries = tsMap.get(key);
|
@Override
|
||||||
if (entries != null) {
|
public void onSuccess(@Nullable Void tmp) {}
|
||||||
Optional<TsKvEntry> tsKvEntry = entries.stream()
|
|
||||||
.filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
|
|
||||||
.max(comparingLong(TsKvEntry::getTs));
|
|
||||||
tsKvEntry.ifPresent(entityViewLatest::add);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!entityViewLatest.isEmpty()) {
|
|
||||||
saveTimeseries(TimeseriesSaveRequest.builder()
|
|
||||||
.tenantId(tenantId)
|
|
||||||
.entityId(entityView.getId())
|
|
||||||
.entries(entityViewLatest)
|
|
||||||
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
|
|
||||||
.callback(new FutureCallback<>() {
|
|
||||||
@Override
|
|
||||||
public void onSuccess(@Nullable Void tmp) {}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t);
|
log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.build());
|
.build());
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.error("Error while finding entity views by tenantId and entityId", t);
|
log.error("Error while finding entity views by tenantId and entityId", t);
|
||||||
}
|
}
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
|
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
|
||||||
|
|||||||
@ -359,6 +359,45 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
then(subscriptionManagerService).shouldHaveNoInteractions();
|
then(subscriptionManagerService).shouldHaveNoInteractions();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldNotCopyLatestToEntityViewWhenTimeseriesSaveFailedOnMainEntity() {
|
||||||
|
// GIVEN
|
||||||
|
var entityView = new EntityView(new EntityViewId(UUID.randomUUID()));
|
||||||
|
entityView.setTenantId(tenantId);
|
||||||
|
entityView.setCustomerId(customerId);
|
||||||
|
entityView.setEntityId(entityId);
|
||||||
|
entityView.setKeys(new TelemetryEntityView(sampleTimeseries.stream().map(KvEntry::getKey).toList(), new AttributesEntityView()));
|
||||||
|
|
||||||
|
// mock that there is one entity view
|
||||||
|
lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(List.of(entityView)));
|
||||||
|
// mock that save latest call for entity view is successful
|
||||||
|
lenient().when(tsService.saveLatest(tenantId, entityView.getId(), sampleTimeseries)).thenReturn(immediateFuture(TimeseriesSaveResult.of(sampleTimeseries.size(), listOfNNumbers(sampleTimeseries.size()))));
|
||||||
|
// mock TPI for entity view
|
||||||
|
lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityView.getId())).thenReturn(tpi);
|
||||||
|
|
||||||
|
var request = TimeseriesSaveRequest.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.customerId(customerId)
|
||||||
|
.entityId(entityId)
|
||||||
|
.entries(sampleTimeseries)
|
||||||
|
.ttl(sampleTtl)
|
||||||
|
.strategy(new TimeseriesSaveRequest.Strategy(true, true, false, false))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
given(tsService.save(tenantId, entityId, sampleTimeseries, sampleTtl)).willReturn(immediateFailedFuture(new RuntimeException("failed to save data on main entity")));
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
telemetryService.saveTimeseries(request);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
// should save only time series for the main entity
|
||||||
|
then(tsService).should().save(tenantId, entityId, sampleTimeseries, sampleTtl);
|
||||||
|
then(tsService).shouldHaveNoMoreInteractions();
|
||||||
|
|
||||||
|
// should not send any WS updates
|
||||||
|
then(subscriptionManagerService).shouldHaveNoInteractions();
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("allCombinationsOfFourBooleans")
|
@MethodSource("allCombinationsOfFourBooleans")
|
||||||
void shouldCallCorrectSaveTimeseriesApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) {
|
void shouldCallCorrectSaveTimeseriesApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) {
|
||||||
|
|||||||
@ -97,4 +97,5 @@ public enum EntityType {
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user