Add decrease inactivity timeout test with a controlled time
This commit is contained in:
parent
5e6dd652d7
commit
f65f152e0c
@ -223,7 +223,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
}
|
}
|
||||||
log.trace("on Device Connect [{}]", deviceId.getId());
|
log.trace("on Device Connect [{}]", deviceId.getId());
|
||||||
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||||
long ts = System.currentTimeMillis();
|
long ts = getCurrentTimeMillis();
|
||||||
stateData.getState().setLastConnectTime(ts);
|
stateData.getState().setLastConnectTime(ts);
|
||||||
save(deviceId, LAST_CONNECT_TIME, ts);
|
save(deviceId, LAST_CONNECT_TIME, ts);
|
||||||
pushRuleEngineMessage(stateData, TbMsgType.CONNECT_EVENT);
|
pushRuleEngineMessage(stateData, TbMsgType.CONNECT_EVENT);
|
||||||
@ -264,7 +264,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||||
long ts = System.currentTimeMillis();
|
long ts = getCurrentTimeMillis();
|
||||||
stateData.getState().setLastDisconnectTime(ts);
|
stateData.getState().setLastDisconnectTime(ts);
|
||||||
save(deviceId, LAST_DISCONNECT_TIME, ts);
|
save(deviceId, LAST_DISCONNECT_TIME, ts);
|
||||||
pushRuleEngineMessage(stateData, TbMsgType.DISCONNECT_EVENT);
|
pushRuleEngineMessage(stateData, TbMsgType.DISCONNECT_EVENT);
|
||||||
@ -398,10 +398,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
|
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
|
||||||
var deviceState = state.getState();
|
var deviceState = state.getState();
|
||||||
if (deviceState.isActive()) {
|
if (deviceState.isActive()) {
|
||||||
updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, state);
|
updateInactivityStateIfExpired(getCurrentTimeMillis(), deviceId, state);
|
||||||
} else {
|
} else {
|
||||||
//trying to fix activity state
|
//trying to fix activity state
|
||||||
if (isActive(System.currentTimeMillis(), deviceState)) {
|
if (isActive(getCurrentTimeMillis(), deviceState)) {
|
||||||
updateActivityState(deviceId, state, deviceState.getLastActivityTime());
|
updateActivityState(deviceId, state, deviceState.getLastActivityTime());
|
||||||
if (deviceState.getLastInactivityAlarmTime() != 0L && deviceState.getLastInactivityAlarmTime() >= deviceState.getLastActivityTime()) {
|
if (deviceState.getLastInactivityAlarmTime() != 0L && deviceState.getLastInactivityAlarmTime() >= deviceState.getLastActivityTime()) {
|
||||||
deviceState.setLastInactivityAlarmTime(0L);
|
deviceState.setLastInactivityAlarmTime(0L);
|
||||||
@ -425,7 +425,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
|
|
||||||
void checkStates() {
|
void checkStates() {
|
||||||
try {
|
try {
|
||||||
final long ts = System.currentTimeMillis();
|
final long ts = getCurrentTimeMillis();
|
||||||
partitionedEntities.forEach((tpi, deviceIds) -> {
|
partitionedEntities.forEach((tpi, deviceIds) -> {
|
||||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||||
Set<DeviceId> idsFromRemovedTenant = new HashSet<>();
|
Set<DeviceId> idsFromRemovedTenant = new HashSet<>();
|
||||||
@ -798,7 +798,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
if (persistToTelemetry) {
|
if (persistToTelemetry) {
|
||||||
tsSubService.saveAndNotifyInternal(
|
tsSubService.saveAndNotifyInternal(
|
||||||
TenantId.SYS_TENANT_ID, deviceId,
|
TenantId.SYS_TENANT_ID, deviceId,
|
||||||
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))),
|
Collections.singletonList(new BasicTsKvEntry(getCurrentTimeMillis(), new LongDataEntry(key, value))),
|
||||||
new TelemetrySaveCallback<>(deviceId, key, value));
|
new TelemetrySaveCallback<>(deviceId, key, value));
|
||||||
} else {
|
} else {
|
||||||
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
|
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
|
||||||
@ -809,13 +809,17 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
if (persistToTelemetry) {
|
if (persistToTelemetry) {
|
||||||
tsSubService.saveAndNotifyInternal(
|
tsSubService.saveAndNotifyInternal(
|
||||||
TenantId.SYS_TENANT_ID, deviceId,
|
TenantId.SYS_TENANT_ID, deviceId,
|
||||||
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))),
|
Collections.singletonList(new BasicTsKvEntry(getCurrentTimeMillis(), new BooleanDataEntry(key, value))),
|
||||||
new TelemetrySaveCallback<>(deviceId, key, value));
|
new TelemetrySaveCallback<>(deviceId, key, value));
|
||||||
} else {
|
} else {
|
||||||
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
|
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getCurrentTimeMillis() {
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
private static class TelemetrySaveCallback<T> implements FutureCallback<T> {
|
private static class TelemetrySaveCallback<T> implements FutureCallback<T> {
|
||||||
private final DeviceId deviceId;
|
private final DeviceId deviceId;
|
||||||
private final String key;
|
private final String key;
|
||||||
|
|||||||
@ -54,6 +54,7 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
@ -356,6 +357,56 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
verify(telemetrySubscriptionService, times(1)).saveAttrAndNotify(any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(isActive), any());
|
verify(telemetrySubscriptionService, times(1)).saveAttrAndNotify(any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(isActive), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("provideParametersForDecreaseInactivityTimeout")
|
||||||
|
public void givenTestParameters_whenOnDeviceInactivityTimeout_thenShouldBeInTheExpectedStateAndPerformExpectedActions(
|
||||||
|
boolean activityState, long newInactivityTimeout, long timeIncrement, boolean expectedActivityState
|
||||||
|
) throws Exception {
|
||||||
|
// GIVEN
|
||||||
|
long defaultInactivityTimeout = 10000;
|
||||||
|
initStateService(defaultInactivityTimeout);
|
||||||
|
|
||||||
|
var currentTime = new AtomicLong(System.currentTimeMillis());
|
||||||
|
|
||||||
|
DeviceState deviceState = DeviceState.builder()
|
||||||
|
.active(activityState)
|
||||||
|
.lastActivityTime(currentTime.get())
|
||||||
|
.inactivityTimeout(defaultInactivityTimeout)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DeviceStateData deviceStateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(new TbMsgMetaData())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
|
given(service.getCurrentTimeMillis()).willReturn(currentTime.addAndGet(timeIncrement));
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newInactivityTimeout);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(deviceState.getInactivityTimeout()).isEqualTo(newInactivityTimeout);
|
||||||
|
assertThat(deviceState.isActive()).isEqualTo(expectedActivityState);
|
||||||
|
if (activityState && !expectedActivityState) {
|
||||||
|
then(telemetrySubscriptionService).should().saveAttrAndNotify(
|
||||||
|
any(), eq(deviceId), any(), eq(ACTIVITY_STATE), eq(false), any()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<Arguments> provideParametersForDecreaseInactivityTimeout() {
|
||||||
|
return Stream.of(
|
||||||
|
Arguments.of(true, 1, 0, true),
|
||||||
|
|
||||||
|
Arguments.of(true, 1, 1, false)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() {
|
public void givenStateDataIsNull_whenUpdateInactivityTimeoutIfExpired_thenShouldCleanupDevice() {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user