From 254c87e29f9527e3d73feca8f607206288d453ef Mon Sep 17 00:00:00 2001 From: Sergey Matvienko <50422128+handsome-serg@users.noreply.github.com> Date: Mon, 17 May 2021 09:36:19 +0300 Subject: [PATCH] default state service: refactor async DB call with Futures transformAsync --- .../state/DefaultDeviceStateService.java | 105 +++++++++++------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index ca8a731b65..f2fdaa64d3 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -61,6 +62,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -401,9 +403,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener deviceIds = new HashSet<>(deviceStates.keySet()); + void updateState() { + final long ts = System.currentTimeMillis(); log.debug("Calculating state updates for {} devices", deviceStates.size()); + Set deviceIds = new HashSet<>(deviceStates.keySet()); for (DeviceId deviceId : deviceIds) { - DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); - log.trace("Processing state {} for device {}", stateData, deviceId); - if (stateData != null) { - DeviceState state = stateData.getState(); - state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); - if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) { - state.setLastInactivityAlarmTime(ts); - pushRuleEngineMessage(stateData, INACTIVITY_EVENT); - save(deviceId, INACTIVITY_ALARM_TIME, ts); - save(deviceId, ACTIVITY_STATE, state.isActive()); - } - } else { - log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId); - deviceStates.remove(deviceId); - deviceLastSavedActivity.remove(deviceId); + updateState(ts, deviceId); + } + } + + void updateState(DeviceId deviceId) { + updateState(System.currentTimeMillis(), deviceId); + } + + void updateState(long ts, DeviceId deviceId) { + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); + log.trace("Processing state {} for device {}", stateData, deviceId); + if (stateData != null) { + DeviceState state = stateData.getState(); + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout()); + if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) { + state.setLastInactivityAlarmTime(ts); + pushRuleEngineMessage(stateData, INACTIVITY_EVENT); + save(deviceId, INACTIVITY_ALARM_TIME, ts); + save(deviceId, ACTIVITY_STATE, state.isActive()); } + } else { + log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId); + deviceStates.remove(deviceId); + deviceLastSavedActivity.remove(deviceId); } } @@ -492,39 +502,58 @@ public class DefaultDeviceStateService extends TbApplicationEventListener fetchDeviceState(Device device) { + ListenableFuture future; if (persistToTelemetry) { ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); - return Futures.transform(tsData, extractDeviceStateData(device), dbCallbackExecutorService); + future = Futures.transform(tsData, extractDeviceStateData(device), dbCallbackExecutorService); } else { ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); - return Futures.transform(attrData, extractDeviceStateData(device), dbCallbackExecutorService); + future = Futures.transform(attrData, extractDeviceStateData(device), dbCallbackExecutorService); } + return transformInactivityTimeout(future); + } + + // voba - schwarz fix to use timeseries/attributes for inactivity timeout + private ListenableFuture transformInactivityTimeout(ListenableFuture future) { + return Futures.transformAsync(future, deviceStateData -> { + if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)) { + return future; //fail fast + } + final SettableFuture resultFuture = SettableFuture.create(); + Futures.addCallback( + attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), SERVER_SCOPE, INACTIVITY_TIMEOUT), + new FutureCallback>() { + @Override + public void onSuccess(Optional inactivityTimeoutOpt) { + inactivityTimeoutOpt.flatMap(KvEntry::getLongValue).ifPresent((inactivityTimeout) -> { + if (inactivityTimeout > 0) { + deviceStateData.getState().setInactivityTimeout(inactivityTimeout); + } + }); + resultFuture.set(deviceStateData); + } + + @Override + public void onFailure(Throwable t) { + resultFuture.setException(t); + } + }, + dbCallbackExecutorService); + return resultFuture; + }, dbCallbackExecutorService); } private Function, DeviceStateData> extractDeviceStateData(Device device) { return new Function, DeviceStateData>() { - @Nullable + @Nonnull @Override public DeviceStateData apply(@Nullable List data) { try { long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L); long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L); long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)); - // voba - fix to use timeseries/attributes for inactivity timeout - if (persistToTelemetry && inactivityTimeout == TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)) { - try { - Optional inactivityTimeoutOpt = - attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), SERVER_SCOPE, INACTIVITY_TIMEOUT).get(); - if (inactivityTimeoutOpt.isPresent() && inactivityTimeoutOpt.get().getLongValue().isPresent() - && inactivityTimeoutOpt.get().getLongValue().get() > 0) { - inactivityTimeout = inactivityTimeoutOpt.get().getLongValue().get(); - } - } catch (Exception ignored) { - } - } - // TODO: voba - do we need to calculate it or it's better to fetch from DB directly? - // boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout; - boolean active = getEntryValue(data, ACTIVITY_STATE, false); + //Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state + final boolean active = getEntryValue(data, ACTIVITY_STATE, false); DeviceState deviceState = DeviceState.builder() .active(active) .lastConnectTime(getEntryValue(data, LAST_CONNECT_TIME, 0L)) @@ -587,7 +616,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener