diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 8b09565ef9..cdc5afe361 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -228,7 +228,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService= deviceState.getLastActivityTime()) { + deviceState.setLastInactivityAlarmTime(0L); + save(deviceId, INACTIVITY_ALARM_TIME, 0L); + } } } } diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 52f2ec5c9d..ca30f9e5ed 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -22,19 +22,30 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.TsValue; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.sql.query.EntityQueryRepository; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -62,14 +73,32 @@ public class DefaultDeviceStateServiceTest { PartitionService partitionService; @Mock DeviceStateData deviceStateDataMock; + @Mock + EntityQueryRepository entityQueryRepository; + TenantId tenantId = new TenantId(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112")); DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"); + TopicPartitionInfo tpi; DefaultDeviceStateService service; + TelemetrySubscriptionService telemetrySubscriptionService; + @Before public void setUp() { - service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, null, null, null, mock(NotificationRuleProcessor.class))); + service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, entityQueryRepository, null, null, mock(NotificationRuleProcessor.class))); + telemetrySubscriptionService = Mockito.mock(TelemetrySubscriptionService.class); + ReflectionTestUtils.setField(service, "tsSubService", telemetrySubscriptionService); + ReflectionTestUtils.setField(service, "defaultStateCheckIntervalInSec", 60); + ReflectionTestUtils.setField(service, "defaultActivityStatsIntervalInSec", 60); + ReflectionTestUtils.setField(service, "initFetchPackSize", 10); + + tpi = TopicPartitionInfo.builder().myPartition(true).build(); + Mockito.when(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(tpi); + Mockito.when(entityQueryRepository.findEntityDataByQueryInternal(Mockito.any())).thenReturn(new PageData<>()); + var deviceIdInfo = new DeviceIdInfo(tenantId.getId(), null, deviceId.getId()); + Mockito.when(deviceService.findDeviceIdInfos(Mockito.any())) + .thenReturn(new PageData<>(List.of(deviceIdInfo), 0, 1, false)); } @Test @@ -125,4 +154,188 @@ public class DefaultDeviceStateServiceTest { Assert.assertEquals(5000L, deviceStateData.getState().getInactivityTimeout()); } + private void initStateService(long timeout) throws InterruptedException { + service.stop(); + Mockito.reset(service, telemetrySubscriptionService); + ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout); + service.init(); + PartitionChangeEvent event = new PartitionChangeEvent(this, new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi)); + service.onApplicationEvent(event); + Thread.sleep(100); + } + + @Test + public void increaseInactivityForInactiveDeviceTest() throws Exception { + final long defaultTimeout = 1; + initStateService(defaultTimeout); + DeviceState deviceState = DeviceState.builder().build(); + DeviceStateData deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + Thread.sleep(defaultTimeout); + service.checkStates(); + activityVerify(false); + + Mockito.reset(telemetrySubscriptionService); + + long increase = 100; + long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase; + + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); + activityVerify(true); + Thread.sleep(increase); + service.checkStates(); + activityVerify(false); + + Mockito.reset(telemetrySubscriptionService); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + Thread.sleep(newTimeout + 5); + service.checkStates(); + activityVerify(false); + } + + @Test + public void increaseInactivityForActiveDeviceTest() throws Exception { + final long defaultTimeout = 1000; + initStateService(defaultTimeout); + DeviceState deviceState = DeviceState.builder().build(); + DeviceStateData deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + + Mockito.reset(telemetrySubscriptionService); + + long increase = 100; + long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase; + + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); + Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any()); + Thread.sleep(defaultTimeout + increase); + service.checkStates(); + activityVerify(false); + + Mockito.reset(telemetrySubscriptionService); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + Thread.sleep(newTimeout); + service.checkStates(); + activityVerify(false); + } + + @Test + public void increaseSmallInactivityForInactiveDeviceTest() throws Exception { + final long defaultTimeout = 1; + initStateService(defaultTimeout); + DeviceState deviceState = DeviceState.builder().build(); + DeviceStateData deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + Thread.sleep(defaultTimeout); + service.checkStates(); + activityVerify(false); + + Mockito.reset(telemetrySubscriptionService); + + long newTimeout = 1; + Thread.sleep(newTimeout); + Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any()); + } + + @Test + public void decreaseInactivityForActiveDeviceTest() throws Exception { + final long defaultTimeout = 1000; + initStateService(defaultTimeout); + DeviceState deviceState = DeviceState.builder().build(); + DeviceStateData deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + + Mockito.reset(telemetrySubscriptionService); + + Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any()); + + long newTimeout = 1; + + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); + activityVerify(false); + Mockito.reset(telemetrySubscriptionService); + + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, defaultTimeout); + activityVerify(true); + Thread.sleep(defaultTimeout); + service.checkStates(); + activityVerify(false); + } + + @Test + public void decreaseInactivityForInactiveDeviceTest() throws Exception { + final long defaultTimeout = 1000; + initStateService(defaultTimeout); + DeviceState deviceState = DeviceState.builder().build(); + DeviceStateData deviceStateData = DeviceStateData.builder() + .tenantId(tenantId) + .deviceId(deviceId) + .state(deviceState) + .metaData(new TbMsgMetaData()) + .build(); + + service.deviceStates.put(deviceId, deviceStateData); + service.getPartitionedEntities(tpi).add(deviceId); + + service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis()); + activityVerify(true); + Thread.sleep(defaultTimeout); + service.checkStates(); + activityVerify(false); + Mockito.reset(telemetrySubscriptionService); + + long newTimeout = 1; + + service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); + Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any()); + } + + private void activityVerify(boolean isActive) { + Mockito.verify(telemetrySubscriptionService, Mockito.times(1)).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.eq(isActive), Mockito.any()); + } + } \ No newline at end of file