diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 23276dc104..1f9239cea3 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -223,7 +223,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService= deviceState.getLastActivityTime()) { deviceState.setLastInactivityAlarmTime(0L); @@ -425,7 +425,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService { log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size()); Set idsFromRemovedTenant = new HashSet<>(); @@ -455,7 +455,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService> stats = new HashMap<>(); for (DeviceStateData stateData : deviceStates.values()) { Pair tenantDevicesActivity = stats.computeIfAbsent(stateData.getTenantId(), @@ -798,7 +798,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, key, value)); } else { tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value)); @@ -809,13 +809,17 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, key, value)); } else { tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value)); } } + long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + private static class TelemetrySaveCallback implements FutureCallback { private final DeviceId deviceId; private final String key; diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index b06bbab0de..d03771cde3 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -356,6 +357,56 @@ public class DefaultDeviceStateServiceTest { verify(telemetrySubscriptionService, times(1)).saveAttrAndNotify(any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(isActive), any()); } + @ParameterizedTest + @MethodSource("provideParametersForDecreaseInactivityTimeout") + public void givenTestParameters_whenOnDeviceInactivityTimeout_thenShouldBeInTheExpectedStateAndPerformExpectedActions( + boolean activityState, long newInactivityTimeout, long timeIncrement, boolean expectedActivityState + ) throws Exception { + // GIVEN + long defaultInactivityTimeout = 10000; + initStateService(defaultInactivityTimeout); + + var currentTime = new AtomicLong(System.currentTimeMillis()); + + DeviceState deviceState = DeviceState.builder() + .active(activityState) + .lastActivityTime(currentTime.get()) + .inactivityTimeout(defaultInactivityTimeout) + .build(); + + DeviceStateData deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + given(service.getCurrentTimeMillis()).willReturn(currentTime.addAndGet(timeIncrement)); + + // WHEN + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newInactivityTimeout); + + // THEN + assertThat(deviceState.getInactivityTimeout()).isEqualTo(newInactivityTimeout); + assertThat(deviceState.isActive()).isEqualTo(expectedActivityState); + if (activityState && !expectedActivityState) { + then(telemetrySubscriptionService).should().saveAttrAndNotify( + any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(false), any() + ); + } + } + + private static Stream provideParametersForDecreaseInactivityTimeout() { + return Stream.of( + Arguments.of(true, 1, 0, true), + + Arguments.of(true, 1, 1, false) + ); + } + @Test public void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() { // GIVEN