diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index a371582409..6fc6d5bad9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -152,8 +152,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { entityData = new HashMap<>(); attributes = JacksonUtil.newObjectNode(); for (AttributeKvEntry attr : ssAttributes) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey()) - && !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) { + if (DefaultDeviceStateService.ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT.contains(attr.getKey())) { continue; } if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) { @@ -200,7 +199,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } Map> tsData = new HashMap<>(); for (TsKvEntry tsKvEntry : tsKvEntries) { - if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) { + if (DefaultDeviceStateService.ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT.contains(tsKvEntry.getKey())) { continue; } tsData.computeIfAbsent(tsKvEntry.getTs(), k -> new HashMap<>()).put(tsKvEntry.getKey(), tsKvEntry.getValue()); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 11819ce5d8..f8cb1a1059 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -96,6 +96,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -129,11 +130,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService PERSISTENT_TELEMETRY_KEYS = Arrays.asList( new EntityKey(EntityKeyType.TIME_SERIES, LAST_ACTIVITY_TIME), new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_ALARM_TIME), - new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT), new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE), new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME), new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME), - new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT)); + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT)); // inactivity timeout is always a server attribute, even when activity data is stored as time series private static final List PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList( new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME), @@ -143,8 +143,14 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, - LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT); + public static final List ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT = List.of( + ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME + ); + + public static final List ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT = List.of( + ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT + ); + private static final List PERSISTENT_ENTITY_FIELDS = Arrays.asList( new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), new EntityKey(EntityKeyType.ENTITY_FIELD, "type"), @@ -643,41 +649,45 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService fetchDeviceState(Device device) { ListenableFuture future; if (persistToTelemetry) { - ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); - future = Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor()); + ListenableFuture> timeseriesActivityDataFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), ACTIVITY_KEYS_WITHOUT_INACTIVITY_TIMEOUT); + ListenableFuture> inactivityTimeoutAttributeFuture = attributesService.find( + TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT + ); + + ListenableFuture> fullActivityDataFuture = Futures.whenAllSucceed(timeseriesActivityDataFuture, inactivityTimeoutAttributeFuture).call(() -> { + List activityTimeseries = Futures.getDone(timeseriesActivityDataFuture); + Optional inactivityTimeoutAttribute = Futures.getDone(inactivityTimeoutAttributeFuture); + + List result; + if (inactivityTimeoutAttribute.isPresent()) { + result = new ArrayList<>(activityTimeseries.size() + 1); + result.addAll(activityTimeseries); + inactivityTimeoutAttribute.ifPresent(result::add); + } else { + return activityTimeseries; + } + + return result; + }, deviceStateCallbackExecutor); + + future = Futures.transform(fullActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor()); } else { - ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); - future = Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor()); + ListenableFuture> attributesActivityDataFuture = attributesService.find( + TenantId.SYS_TENANT_ID, device.getId(), AttributeScope.SERVER_SCOPE, ACTIVITY_KEYS_WITH_INACTIVITY_TIMEOUT + ); + future = Futures.transform(attributesActivityDataFuture, extractDeviceStateData(device), MoreExecutors.directExecutor()); } - return transformInactivityTimeout(future); + return future; } - private ListenableFuture transformInactivityTimeout(ListenableFuture future) { - return Futures.transformAsync(future, deviceStateData -> { - if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != defaultInactivityTimeoutMs) { - return future; //fail fast - } - var attributesFuture = attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), AttributeScope.SERVER_SCOPE, INACTIVITY_TIMEOUT); - return Futures.transform(attributesFuture, attributes -> { - attributes.flatMap(KvEntry::getLongValue).ifPresent((inactivityTimeout) -> { - if (inactivityTimeout > 0) { - deviceStateData.getState().setInactivityTimeout(inactivityTimeout); - } - }); - return deviceStateData; - }, MoreExecutors.directExecutor()); - }, deviceStateCallbackExecutor); - } - - private Function, DeviceStateData> extractDeviceStateData(Device device) { + private Function, DeviceStateData> extractDeviceStateData(Device device) { return new Function<>() { @Nonnull @Override - public DeviceStateData apply(@Nullable List data) { + public DeviceStateData apply(@Nullable List data) { try { long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L); long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L); @@ -690,7 +700,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService 0 ? inactivityTimeout : defaultInactivityTimeoutMs) .build(); TbMsgMetaData md = new TbMsgMetaData(); md.putValue("deviceName", device.getName()); @@ -761,12 +771,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService keys, TbCallback callback) { onTimeSeriesUpdate(entityId, keys.stream().map(key -> new BasicTsKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList())); - if (entityId.getEntityType() == EntityType.DEVICE) { - deleteDeviceInactivityTimeout(tenantId, entityId, keys); - } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d0507d4fee..36cd5516a8 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,7 +31,6 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -39,11 +38,9 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; @@ -55,7 +52,6 @@ import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; -import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.ArrayList; @@ -82,7 +78,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer private final TbApiUsageReportClient apiUsageClient; private final TbApiUsageStateService apiUsageStateService; private final CalculatedFieldQueueService calculatedFieldQueueService; - private final DeviceStateManager deviceStateManager; private ExecutorService tsCallBackExecutor; @@ -94,15 +89,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Lazy TbEntityViewService tbEntityViewService, TbApiUsageReportClient apiUsageClient, TbApiUsageStateService apiUsageStateService, - CalculatedFieldQueueService calculatedFieldQueueService, - DeviceStateManager deviceStateManager) { + CalculatedFieldQueueService calculatedFieldQueueService) { this.attrService = attrService; this.tsService = tsService; this.tbEntityViewService = tbEntityViewService; this.apiUsageClient = apiUsageClient; this.apiUsageStateService = apiUsageStateService; this.calculatedFieldQueueService = calculatedFieldQueueService; - this.deviceStateManager = deviceStateManager; } @PostConstruct @@ -165,14 +158,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } }, t -> request.getCallback().onFailure(t)); - if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest() /* Device State Service reads from the latest values when initializing */) { - findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout -> - addMainCallback(resultFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate( - tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY) - ) - ); - } - if (strategy.sendWsUpdate()) { addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); } @@ -182,21 +167,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer return resultFuture; } - private static Optional findNewInactivityTimeout(List entries) { - return entries.stream() - .filter(entry -> Objects.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT, entry.getKey())) - .findFirst() - .map(DefaultTelemetrySubscriptionService::parseAsLong); - } - - private static long parseAsLong(KvEntry kve) { - try { - return Long.parseLong(kve.getValueAsString()); - } catch (NumberFormatException e) { - return 0L; - } - } - @Override public void saveAttributes(AttributesSaveRequest request) { checkInternalEntity(request.getEntityId()); diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 26c978bbd0..4e36a44957 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -38,9 +38,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.query.EntityData; -import org.thingsboard.server.common.data.query.EntityKeyType; -import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; @@ -88,7 +85,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.thingsboard.server.service.state.DefaultDeviceStateService.ACTIVITY_STATE; import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_ALARM_TIME; -import static org.thingsboard.server.service.state.DefaultDeviceStateService.INACTIVITY_TIMEOUT; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_ACTIVITY_TIME; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME; import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_DISCONNECT_TIME; @@ -508,42 +504,6 @@ public class DefaultDeviceStateServiceTest { verify(service).fetchDeviceStateDataUsingSeparateRequests(deviceId); } - @Test - public void givenPersistToTelemetryAndDefaultInactivityTimeoutFetched_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() { - var defaultInactivityTimeoutInSec = 60L; - var latest = - Map.of( - EntityKeyType.TIME_SERIES, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(defaultInactivityTimeoutInSec * 1000))), - EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L))) - ); - - process(latest, defaultInactivityTimeoutInSec); - } - - @Test - public void givenPersistToTelemetryAndNoInactivityTimeoutFetchedFromTimeSeries_whenTransformingToDeviceStateData_thenTryGetInactivityFromAttribute() { - var defaultInactivityTimeoutInSec = 60L; - var latest = - Map.of( - EntityKeyType.SERVER_ATTRIBUTE, Map.of(INACTIVITY_TIMEOUT, new TsValue(0, Long.toString(5000L))) - ); - - process(latest, defaultInactivityTimeoutInSec); - } - - private void process(Map> latest, long defaultInactivityTimeoutInSec) { - service.setDefaultInactivityTimeoutInSec(defaultInactivityTimeoutInSec); - service.setDefaultInactivityTimeoutMs(defaultInactivityTimeoutInSec * 1000); - service.setPersistToTelemetry(true); - - var deviceUuid = UUID.randomUUID(); - var deviceId = new DeviceId(deviceUuid); - - DeviceStateData deviceStateData = service.toDeviceStateData(new EntityData(deviceId, latest, Map.of()), new DeviceIdInfo(TenantId.SYS_TENANT_ID.getId(), UUID.randomUUID(), deviceUuid)); - - assertThat(deviceStateData.getState().getInactivityTimeout()).isEqualTo(5000L); - } - private void initStateService(long timeout) throws InterruptedException { service.stop(); reset(service, telemetrySubscriptionService); diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index b1c84184d2..6416fb4f98 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -24,30 +24,25 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; -import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; -import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.ApiUsageStateId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.objects.AttributesEntityView; @@ -78,17 +73,14 @@ import java.util.concurrent.ExecutorService; import java.util.stream.LongStream; import java.util.stream.Stream; -import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.never; @ExtendWith(MockitoExtension.class) class DefaultTelemetrySubscriptionServiceTest { @@ -132,14 +124,12 @@ class DefaultTelemetrySubscriptionServiceTest { TbApiUsageStateService apiUsageStateService; @Mock CalculatedFieldQueueService calculatedFieldQueueService; - @Mock - DeviceStateManager deviceStateManager; DefaultTelemetrySubscriptionService telemetryService; @BeforeEach void setup() { - telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, calculatedFieldQueueService, deviceStateManager); + telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, calculatedFieldQueueService); ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService); ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService); ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService)); @@ -180,6 +170,28 @@ class DefaultTelemetrySubscriptionServiceTest { tsCallBackExecutor.shutdownNow(); } + /* --- Save time series API --- */ + + @Test + void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(new ApiUsageStateId(UUID.randomUUID())) + .entries(sampleTelemetry) + .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) + .build(); + + // WHEN + assertThatThrownBy(() -> telemetryService.saveTimeseries(request)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Can't update API Usage State!"); + + // THEN + then(tsService).shouldHaveNoInteractions(); + } + @Test void shouldReportStorageDataPointsApiUsageWhenTimeSeriesIsSaved() { // GIVEN @@ -389,148 +401,6 @@ class DefaultTelemetrySubscriptionServiceTest { ); } - @Test - void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { - // GIVEN - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(new ApiUsageStateId(UUID.randomUUID())) - .entries(sampleTelemetry) - .strategy(TimeseriesSaveRequest.Strategy.PROCESS_ALL) - .build(); - - // WHEN - assertThatThrownBy(() -> telemetryService.saveTimeseries(request)) - .isInstanceOf(RuntimeException.class) - .hasMessage("Can't update API Usage State!"); - - // THEN - then(tsService).shouldHaveNoInteractions(); - then(deviceStateManager).shouldHaveNoInteractions(); - } - - @Test - void shouldNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasSavedToLatest() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, 5000L, TbCallback.EMPTY); - } - - @ParameterizedTest - @EnumSource( - value = EntityType.class, - names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test - mode = EnumSource.Mode.EXCLUDE - ) - void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasUpdatedButEntityTypeIsNotDevice(EntityType entityType) { - // GIVEN - var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(nonDeviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, nonDeviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); - lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, nonDeviceId)).thenReturn(immediateFuture(Collections.emptyList())); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - - @Test - void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasNotSavedToLatest() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(true, false, true, true)) - .build(); - - given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(TimeseriesSaveResult.of(1, null))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - - @Test - void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasNotUpdated() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var notInactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("notInactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(notInactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, deviceId, List.of(notInactivityTimeout))).willReturn(immediateFuture(TimeseriesSaveResult.of(1, listOfNNumbers(1)))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - - @Test - void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesSaveFailed() { - // GIVEN - var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); - var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); - - var request = TimeseriesSaveRequest.builder() - .tenantId(tenantId) - .customerId(customerId) - .entityId(deviceId) - .entry(inactivityTimeout) - .strategy(new TimeseriesSaveRequest.Strategy(false, true, false, false)) - .build(); - - given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFailedFuture(new RuntimeException("failed to save"))); - - // WHEN - telemetryService.saveTimeseries(request); - - // THEN - then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); - } - // used to emulate sequence numbers returned by save latest API private static List listOfNNumbers(int N) { return LongStream.range(0, N).boxed().toList();