Merge pull request #8841 from YevhenBondarenko/fix/inactivity
fixed update inactivity timeout attribute
This commit is contained in:
commit
389d05a567
@ -228,7 +228,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
save(deviceId, LAST_CONNECT_TIME, ts);
|
save(deviceId, LAST_CONNECT_TIME, ts);
|
||||||
pushRuleEngineMessage(stateData, TbMsgType.CONNECT_EVENT);
|
pushRuleEngineMessage(stateData, TbMsgType.CONNECT_EVENT);
|
||||||
checkAndUpdateState(deviceId, stateData);
|
checkAndUpdateState(deviceId, stateData);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -397,12 +396,17 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
}
|
}
|
||||||
|
|
||||||
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
|
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
|
||||||
if (state.getState().isActive()) {
|
var deviceState = state.getState();
|
||||||
|
if (deviceState.isActive()) {
|
||||||
updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, state);
|
updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, state);
|
||||||
} else {
|
} else {
|
||||||
//trying to fix activity state
|
//trying to fix activity state
|
||||||
if (isActive(System.currentTimeMillis(), state.getState())) {
|
if (isActive(System.currentTimeMillis(), deviceState)) {
|
||||||
updateActivityState(deviceId, state, state.getState().getLastActivityTime());
|
updateActivityState(deviceId, state, deviceState.getLastActivityTime());
|
||||||
|
if (deviceState.getLastInactivityAlarmTime() != 0L && deviceState.getLastInactivityAlarmTime() >= deviceState.getLastActivityTime()) {
|
||||||
|
deviceState.setLastInactivityAlarmTime(0L);
|
||||||
|
save(deviceId, INACTIVITY_ALARM_TIME, 0L);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,19 +22,30 @@ import org.junit.runner.RunWith;
|
|||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.junit.MockitoJUnitRunner;
|
import org.mockito.junit.MockitoJUnitRunner;
|
||||||
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
import org.thingsboard.server.common.data.query.EntityData;
|
import org.thingsboard.server.common.data.query.EntityData;
|
||||||
import org.thingsboard.server.common.data.query.EntityKeyType;
|
import org.thingsboard.server.common.data.query.EntityKeyType;
|
||||||
import org.thingsboard.server.common.data.query.TsValue;
|
import org.thingsboard.server.common.data.query.TsValue;
|
||||||
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.device.DeviceService;
|
import org.thingsboard.server.dao.device.DeviceService;
|
||||||
|
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
|
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||||
|
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||||
|
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@ -62,14 +73,32 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
PartitionService partitionService;
|
PartitionService partitionService;
|
||||||
@Mock
|
@Mock
|
||||||
DeviceStateData deviceStateDataMock;
|
DeviceStateData deviceStateDataMock;
|
||||||
|
@Mock
|
||||||
|
EntityQueryRepository entityQueryRepository;
|
||||||
|
|
||||||
|
TenantId tenantId = new TenantId(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"));
|
||||||
DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
|
DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
|
||||||
|
TopicPartitionInfo tpi;
|
||||||
|
|
||||||
DefaultDeviceStateService service;
|
DefaultDeviceStateService service;
|
||||||
|
|
||||||
|
TelemetrySubscriptionService telemetrySubscriptionService;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, null, null, null, mock(NotificationRuleProcessor.class)));
|
service = spy(new DefaultDeviceStateService(deviceService, attributesService, tsService, clusterService, partitionService, entityQueryRepository, null, null, mock(NotificationRuleProcessor.class)));
|
||||||
|
telemetrySubscriptionService = Mockito.mock(TelemetrySubscriptionService.class);
|
||||||
|
ReflectionTestUtils.setField(service, "tsSubService", telemetrySubscriptionService);
|
||||||
|
ReflectionTestUtils.setField(service, "defaultStateCheckIntervalInSec", 60);
|
||||||
|
ReflectionTestUtils.setField(service, "defaultActivityStatsIntervalInSec", 60);
|
||||||
|
ReflectionTestUtils.setField(service, "initFetchPackSize", 10);
|
||||||
|
|
||||||
|
tpi = TopicPartitionInfo.builder().myPartition(true).build();
|
||||||
|
Mockito.when(partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(tpi);
|
||||||
|
Mockito.when(entityQueryRepository.findEntityDataByQueryInternal(Mockito.any())).thenReturn(new PageData<>());
|
||||||
|
var deviceIdInfo = new DeviceIdInfo(tenantId.getId(), null, deviceId.getId());
|
||||||
|
Mockito.when(deviceService.findDeviceIdInfos(Mockito.any()))
|
||||||
|
.thenReturn(new PageData<>(List.of(deviceIdInfo), 0, 1, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -125,4 +154,188 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
Assert.assertEquals(5000L, deviceStateData.getState().getInactivityTimeout());
|
Assert.assertEquals(5000L, deviceStateData.getState().getInactivityTimeout());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void initStateService(long timeout) throws InterruptedException {
|
||||||
|
service.stop();
|
||||||
|
Mockito.reset(service, telemetrySubscriptionService);
|
||||||
|
ReflectionTestUtils.setField(service, "defaultInactivityTimeoutMs", timeout);
|
||||||
|
service.init();
|
||||||
|
PartitionChangeEvent event = new PartitionChangeEvent(this, new QueueKey(ServiceType.TB_CORE), Collections.singleton(tpi));
|
||||||
|
service.onApplicationEvent(event);
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void increaseInactivityForInactiveDeviceTest() throws Exception {
|
||||||
|
final long defaultTimeout = 1;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
DeviceState deviceState = DeviceState.builder().build();
|
||||||
|
DeviceStateData deviceStateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(new TbMsgMetaData())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(defaultTimeout);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
long increase = 100;
|
||||||
|
long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase;
|
||||||
|
|
||||||
|
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(increase);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(newTimeout + 5);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void increaseInactivityForActiveDeviceTest() throws Exception {
|
||||||
|
final long defaultTimeout = 1000;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
DeviceState deviceState = DeviceState.builder().build();
|
||||||
|
DeviceStateData deviceStateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(new TbMsgMetaData())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
long increase = 100;
|
||||||
|
long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase;
|
||||||
|
|
||||||
|
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
|
||||||
|
Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any());
|
||||||
|
Thread.sleep(defaultTimeout + increase);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(newTimeout);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void increaseSmallInactivityForInactiveDeviceTest() throws Exception {
|
||||||
|
final long defaultTimeout = 1;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
DeviceState deviceState = DeviceState.builder().build();
|
||||||
|
DeviceStateData deviceStateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(new TbMsgMetaData())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(defaultTimeout);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
long newTimeout = 1;
|
||||||
|
Thread.sleep(newTimeout);
|
||||||
|
Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void decreaseInactivityForActiveDeviceTest() throws Exception {
|
||||||
|
final long defaultTimeout = 1000;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
DeviceState deviceState = DeviceState.builder().build();
|
||||||
|
DeviceStateData deviceStateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(new TbMsgMetaData())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any());
|
||||||
|
|
||||||
|
long newTimeout = 1;
|
||||||
|
|
||||||
|
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
|
||||||
|
activityVerify(false);
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, defaultTimeout);
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(defaultTimeout);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void decreaseInactivityForInactiveDeviceTest() throws Exception {
|
||||||
|
final long defaultTimeout = 1000;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
DeviceState deviceState = DeviceState.builder().build();
|
||||||
|
DeviceStateData deviceStateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(new TbMsgMetaData())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||||
|
activityVerify(true);
|
||||||
|
Thread.sleep(defaultTimeout);
|
||||||
|
service.checkStates();
|
||||||
|
activityVerify(false);
|
||||||
|
Mockito.reset(telemetrySubscriptionService);
|
||||||
|
|
||||||
|
long newTimeout = 1;
|
||||||
|
|
||||||
|
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
|
||||||
|
Mockito.verify(telemetrySubscriptionService, Mockito.never()).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.any(), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void activityVerify(boolean isActive) {
|
||||||
|
Mockito.verify(telemetrySubscriptionService, Mockito.times(1)).saveAttrAndNotify(Mockito.any(), Mockito.eq(deviceId), Mockito.any(), Mockito.eq("active"), Mockito.eq(isActive), Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user