default state service: refactor async DB call with Futures transformAsync

This commit is contained in:
Sergey Matvienko 2021-05-17 09:36:19 +03:00 committed by Andrew Shvayka
parent ff54d190c9
commit 254c87e29f

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -61,6 +62,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -401,9 +403,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@Nullable
@Override
public Void apply(@Nullable DeviceStateData state) {
log.debug("[{}][{}] Fetched state from DB [{}]", device.getName(), device.getId(), state);
if (state != null) {
addDeviceUsingState(tpi, state);
//updateState(device.getId()); //TODO update activity or inactivity state
} else {
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId());
}
@ -430,27 +432,35 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
deviceStates.put(state.getDeviceId(), state);
}
private void updateState() {
long ts = System.currentTimeMillis();
Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
void updateState() {
final long ts = System.currentTimeMillis();
log.debug("Calculating state updates for {} devices", deviceStates.size());
Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
for (DeviceId deviceId : deviceIds) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
log.trace("Processing state {} for device {}", stateData, deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
save(deviceId, ACTIVITY_STATE, state.isActive());
}
} else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
deviceStates.remove(deviceId);
deviceLastSavedActivity.remove(deviceId);
updateState(ts, deviceId);
}
}
void updateState(DeviceId deviceId) {
updateState(System.currentTimeMillis(), deviceId);
}
void updateState(long ts, DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
log.trace("Processing state {} for device {}", stateData, deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
save(deviceId, INACTIVITY_ALARM_TIME, ts);
save(deviceId, ACTIVITY_STATE, state.isActive());
}
} else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
deviceStates.remove(deviceId);
deviceLastSavedActivity.remove(deviceId);
}
}
@ -492,39 +502,58 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
}
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
ListenableFuture<DeviceStateData> future;
if (persistToTelemetry) {
ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
return Futures.transform(tsData, extractDeviceStateData(device), dbCallbackExecutorService);
future = Futures.transform(tsData, extractDeviceStateData(device), dbCallbackExecutorService);
} else {
ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
return Futures.transform(attrData, extractDeviceStateData(device), dbCallbackExecutorService);
future = Futures.transform(attrData, extractDeviceStateData(device), dbCallbackExecutorService);
}
return transformInactivityTimeout(future);
}
// voba - schwarz fix to use timeseries/attributes for inactivity timeout
private ListenableFuture<DeviceStateData> transformInactivityTimeout(ListenableFuture<DeviceStateData> future) {
return Futures.transformAsync(future, deviceStateData -> {
if (!persistToTelemetry || deviceStateData.getState().getInactivityTimeout() != TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)) {
return future; //fail fast
}
final SettableFuture<DeviceStateData> resultFuture = SettableFuture.create();
Futures.addCallback(
attributesService.find(TenantId.SYS_TENANT_ID, deviceStateData.getDeviceId(), SERVER_SCOPE, INACTIVITY_TIMEOUT),
new FutureCallback<Optional<AttributeKvEntry>>() {
@Override
public void onSuccess(Optional<AttributeKvEntry> inactivityTimeoutOpt) {
inactivityTimeoutOpt.flatMap(KvEntry::getLongValue).ifPresent((inactivityTimeout) -> {
if (inactivityTimeout > 0) {
deviceStateData.getState().setInactivityTimeout(inactivityTimeout);
}
});
resultFuture.set(deviceStateData);
}
@Override
public void onFailure(Throwable t) {
resultFuture.setException(t);
}
},
dbCallbackExecutorService);
return resultFuture;
}, dbCallbackExecutorService);
}
private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {
return new Function<List<T>, DeviceStateData>() {
@Nullable
@Nonnull
@Override
public DeviceStateData apply(@Nullable List<T> data) {
try {
long lastActivityTime = getEntryValue(data, LAST_ACTIVITY_TIME, 0L);
long inactivityAlarmTime = getEntryValue(data, INACTIVITY_ALARM_TIME, 0L);
long inactivityTimeout = getEntryValue(data, INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
// voba - fix to use timeseries/attributes for inactivity timeout
if (persistToTelemetry && inactivityTimeout == TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)) {
try {
Optional<AttributeKvEntry> inactivityTimeoutOpt =
attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), SERVER_SCOPE, INACTIVITY_TIMEOUT).get();
if (inactivityTimeoutOpt.isPresent() && inactivityTimeoutOpt.get().getLongValue().isPresent()
&& inactivityTimeoutOpt.get().getLongValue().get() > 0) {
inactivityTimeout = inactivityTimeoutOpt.get().getLongValue().get();
}
} catch (Exception ignored) {
}
}
// TODO: voba - do we need to calculate it or it's better to fetch from DB directly?
// boolean active = System.currentTimeMillis() < lastActivityTime + inactivityTimeout;
boolean active = getEntryValue(data, ACTIVITY_STATE, false);
//Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
final boolean active = getEntryValue(data, ACTIVITY_STATE, false);
DeviceState deviceState = DeviceState.builder()
.active(active)
.lastConnectTime(getEntryValue(data, LAST_CONNECT_TIME, 0L))
@ -587,7 +616,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
data = JacksonUtil.toString(state);
}
TbMsgMetaData md = stateData.getMetaData().copy();
if(!persistToTelemetry){
if (!persistToTelemetry) {
md.putValue(DataConstants.SCOPE, SERVER_SCOPE);
}
TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), stateData.getCustomerId(), md, TbMsgDataType.JSON, data);