default state service: update active/inactive state after fetching device on initPartition

This commit is contained in:
Sergey Matvienko 2021-05-17 10:37:14 +03:00 committed by Andrew Shvayka
parent 254c87e29f
commit a9c655b179

View File

@ -163,7 +163,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state"));
// Should be always single threaded due to absence of locks.
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled")));
scheduledExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
}
@PreDestroy
@ -207,19 +207,25 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity);
long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L);
if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
log.trace("on Device Activity - fetched state {} for device {}", stateData, deviceId);
if (stateData != null) {
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
deviceLastSavedActivity.put(deviceId, lastReportedActivity);
DeviceState state = stateData.getState();
state.setLastActivityTime(lastReportedActivity);
if (!state.isActive()) {
state.setActive(true);
save(deviceId, ACTIVITY_STATE, state.isActive());
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
}
final DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
updateActivityState(deviceId, stateData, lastReportedActivity);
}
}
void updateActivityState(DeviceId deviceId, DeviceStateData stateData, long lastReportedActivity) {
log.trace("updateActivityState - fetched state {} for device {}, lastReportedActivity {}", stateData, deviceId, lastReportedActivity);
if (stateData != null) {
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
deviceLastSavedActivity.put(deviceId, lastReportedActivity);
DeviceState state = stateData.getState();
state.setLastActivityTime(lastReportedActivity);
if (!state.isActive()) {
state.setActive(true);
save(deviceId, ACTIVITY_STATE, true);
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
}
} else {
log.warn("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
}
}
@ -247,7 +253,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
DeviceState state = stateData.getState();
state.setInactivityTimeout(inactivityTimeout);
boolean oldActive = state.isActive();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
state.setActive(isActive(ts, state));
if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
save(deviceId, ACTIVITY_STATE, state.isActive());
}
@ -405,7 +411,15 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
public Void apply(@Nullable DeviceStateData state) {
if (state != null) {
addDeviceUsingState(tpi, state);
//updateState(device.getId()); //TODO update activity or inactivity state
if (state.getState().isActive()) {
updateInactivityStateIfExpired(System.currentTimeMillis(), device.getId(), state);
} else {
//trying to fix activity state
if (isActive(System.currentTimeMillis(), state.getState())) {
updateActivityState(device.getId(), state, state.getState().getLastActivityTime());
}
}
} else {
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId());
}
@ -432,30 +446,30 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
deviceStates.put(state.getDeviceId(), state);
}
void updateState() {
void updateInactivityStateIfExpired() {
final long ts = System.currentTimeMillis();
log.debug("Calculating state updates for {} devices", deviceStates.size());
Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
for (DeviceId deviceId : deviceIds) {
updateState(ts, deviceId);
updateInactivityStateIfExpired(ts, deviceId);
}
}
void updateState(DeviceId deviceId) {
updateState(System.currentTimeMillis(), deviceId);
void updateInactivityStateIfExpired(long ts, DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
updateInactivityStateIfExpired(ts, deviceId, stateData);
}
void updateState(long ts, DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
void updateInactivityStateIfExpired(long ts, DeviceId deviceId, DeviceStateData stateData) {
log.trace("Processing state {} for device {}", stateData, deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
if (!isActive(ts, state) && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setActive(false);
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
save(deviceId, ACTIVITY_STATE, state.isActive());
save(deviceId, ACTIVITY_STATE, false);
}
} else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
@ -464,18 +478,29 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
}
}
boolean isActive(long ts, DeviceState state) {
return ts < state.getLastActivityTime() + state.getInactivityTimeout();
}
@Nonnull
private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
DeviceStateData deviceStateData = deviceStates.get(deviceId);
if (deviceStateData == null) {
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device != null) {
try {
deviceStateData = fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData);
} catch (InterruptedException | ExecutionException e) {
log.debug("[{}] Failed to fetch device state!", deviceId, e);
}
if (deviceStateData != null) {
return deviceStateData;
}
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
if (device != null) {
try {
deviceStateData = fetchDeviceState(device).get();
deviceStates.putIfAbsent(deviceId, deviceStateData);
} catch (InterruptedException | ExecutionException e) {
log.warn("[{}] Failed to fetch device state!", deviceId, e);
throw new RuntimeException(e);
}
} else {
log.warn("[{}] Failed to fetch device by Id!", deviceId);
throw new RuntimeException("Failed to fetch device by Id " + deviceId);
}
return deviceStateData;
}