Update last inactivity alarm time after a successful database save; improve tests
This commit is contained in:
parent
d917c72d51
commit
baaa9f7235
@ -340,7 +340,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
log.trace("[{}][{}] On device inactivity: processing inactivity event with ts [{}].", tenantId.getId(), deviceId.getId(), lastInactivityTime);
|
log.trace("[{}][{}] On device inactivity: processing inactivity event with ts [{}].", tenantId.getId(), deviceId.getId(), lastInactivityTime);
|
||||||
reportInactivity(lastInactivityTime, deviceId, stateData);
|
reportInactivity(lastInactivityTime, stateData);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -559,7 +559,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
&& (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() <= state.getLastActivityTime())
|
&& (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() <= state.getLastActivityTime())
|
||||||
&& stateData.getDeviceCreationTime() + state.getInactivityTimeout() <= ts) {
|
&& stateData.getDeviceCreationTime() + state.getInactivityTimeout() <= ts) {
|
||||||
if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) {
|
if (partitionService.resolve(ServiceType.TB_CORE, stateData.getTenantId(), deviceId).isMyPartition()) {
|
||||||
reportInactivity(ts, deviceId, stateData);
|
reportInactivity(ts, stateData);
|
||||||
} else {
|
} else {
|
||||||
cleanupEntity(deviceId);
|
cleanupEntity(deviceId);
|
||||||
}
|
}
|
||||||
@ -570,13 +570,24 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reportInactivity(long ts, DeviceId deviceId, DeviceStateData stateData) {
|
private void reportInactivity(long ts, DeviceStateData stateData) {
|
||||||
DeviceState state = stateData.getState();
|
var tenantId = stateData.getTenantId();
|
||||||
state.setLastInactivityAlarmTime(ts);
|
var deviceId = stateData.getDeviceId();
|
||||||
save(stateData.getTenantId(), deviceId, INACTIVITY_ALARM_TIME, ts);
|
|
||||||
|
Futures.addCallback(save(stateData.getTenantId(), deviceId, INACTIVITY_ALARM_TIME, ts), new FutureCallback<>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Void success) {
|
||||||
|
stateData.getState().setLastInactivityAlarmTime(ts);
|
||||||
onDeviceActivityStatusChange(false, stateData);
|
onDeviceActivityStatusChange(false, stateData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(@NonNull Throwable t) {
|
||||||
|
log.error("[{}][{}] Failed to update device last inactivity alarm time to '{}'. Device state data: {}", tenantId, deviceId, ts, stateData, t);
|
||||||
|
}
|
||||||
|
}, deviceStateCallbackExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isActive(long ts, DeviceState state) {
|
private static boolean isActive(long ts, DeviceState state) {
|
||||||
return ts < state.getLastActivityTime() + state.getInactivityTimeout();
|
return ts < state.getLastActivityTime() + state.getInactivityTimeout();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -882,10 +882,8 @@ class DefaultDeviceStateServiceTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void givenInactiveDevice_whenActivityStatusChangesToActiveButFailedToSaveUpdatedActivityStatus_thenShouldNotUpdateCache() {
|
void givenInactiveDevice_whenActivityStatusChangesToActiveButFailedToSaveUpdatedActivityStatus_thenShouldNotUpdateCache2() {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
doReturn(200L).when(service).getCurrentTimeMillis();
|
|
||||||
|
|
||||||
var deviceState = DeviceState.builder()
|
var deviceState = DeviceState.builder()
|
||||||
.active(false)
|
.active(false)
|
||||||
.lastActivityTime(100L)
|
.lastActivityTime(100L)
|
||||||
@ -902,20 +900,21 @@ class DefaultDeviceStateServiceTest {
|
|||||||
service.deviceStates.put(deviceId, deviceStateData);
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
service.getPartitionedEntities(tpi).add(deviceId);
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
when(telemetrySubscriptionService.saveAttributesInternal(any(AttributesSaveRequest.class)))
|
// WHEN-THEN
|
||||||
.thenAnswer(invocation -> {
|
|
||||||
AttributesSaveRequest request = invocation.getArgument(0);
|
|
||||||
AttributeKvEntry entry = request.getEntries().get(0);
|
|
||||||
return entry.getKey().equals(ACTIVITY_STATE) ?
|
|
||||||
Futures.immediateFailedFuture(new RuntimeException("failed to save")) :
|
|
||||||
Futures.immediateFuture(generateRandomVersions(1));
|
|
||||||
});
|
|
||||||
|
|
||||||
// WHEN
|
// simulating short DB outage
|
||||||
service.onDeviceActivity(tenantId, deviceId, 220L);
|
given(telemetrySubscriptionService.saveAttributesInternal(any())).willReturn(Futures.immediateFailedFuture(new RuntimeException("failed to save")));
|
||||||
|
doReturn(200L).when(service).getCurrentTimeMillis();
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, 180L);
|
||||||
|
assertThat(deviceState.isActive()).isFalse(); // still inactive
|
||||||
|
|
||||||
// THEN
|
// 10 millis pass... and new activity message it received
|
||||||
assertThat(deviceState.isActive()).isFalse();
|
|
||||||
|
// this time DB save is successful
|
||||||
|
when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(generateRandomVersions(1)));
|
||||||
|
doReturn(210L).when(service).getCurrentTimeMillis();
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, 190L);
|
||||||
|
assertThat(deviceState.isActive()).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -937,21 +936,21 @@ class DefaultDeviceStateServiceTest {
|
|||||||
service.deviceStates.put(deviceId, deviceStateData);
|
service.deviceStates.put(deviceId, deviceStateData);
|
||||||
service.getPartitionedEntities(tpi).add(deviceId);
|
service.getPartitionedEntities(tpi).add(deviceId);
|
||||||
|
|
||||||
when(telemetrySubscriptionService.saveAttributesInternal(any(AttributesSaveRequest.class)))
|
// WHEN-THEN (assuming periodic activity states check is done every 100 millis)
|
||||||
.thenAnswer(invocation -> {
|
|
||||||
AttributesSaveRequest request = invocation.getArgument(0);
|
|
||||||
AttributeKvEntry entry = request.getEntries().get(0);
|
|
||||||
return entry.getKey().equals(ACTIVITY_STATE) ?
|
|
||||||
Futures.immediateFailedFuture(new RuntimeException("failed to save")) :
|
|
||||||
Futures.immediateFuture(generateRandomVersions(1));
|
|
||||||
});
|
|
||||||
|
|
||||||
// WHEN
|
// simulating short DB outage
|
||||||
|
given(telemetrySubscriptionService.saveAttributesInternal(any())).willReturn(Futures.immediateFailedFuture(new RuntimeException("failed to save")));
|
||||||
doReturn(200L).when(service).getCurrentTimeMillis();
|
doReturn(200L).when(service).getCurrentTimeMillis();
|
||||||
service.checkStates();
|
service.checkStates();
|
||||||
|
assertThat(deviceState.isActive()).isTrue(); // still active
|
||||||
|
|
||||||
// THEN
|
// waiting 100 millis... periodic activity states check is triggered again
|
||||||
assertThat(deviceState.isActive()).isTrue();
|
|
||||||
|
// this time DB save is successful
|
||||||
|
when(telemetrySubscriptionService.saveAttributesInternal(any())).thenReturn(Futures.immediateFuture(generateRandomVersions(1)));
|
||||||
|
doReturn(300L).when(service).getCurrentTimeMillis();
|
||||||
|
service.checkStates();
|
||||||
|
assertThat(deviceState.isActive()).isFalse();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user