default state service: refactored events order

This commit is contained in:
Sergey Matvienko 2021-05-17 14:30:22 +03:00 committed by Andrew Shvayka
parent a9c655b179
commit 133eddb855

View File

@ -134,7 +134,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
private int initFetchPackSize; private int initFetchPackSize;
private ListeningScheduledExecutorService scheduledExecutor; private ListeningScheduledExecutorService scheduledExecutor;
private ExecutorService dbCallbackExecutorService; private ExecutorService deviceStateExecutor;
private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>(); private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>(); private final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>(); private final ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>();
@ -159,7 +159,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@PostConstruct @PostConstruct
public void init() { public void init() {
dbCallbackExecutorService = Executors.newFixedThreadPool( deviceStateExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state")); Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state"));
// Should be always single threaded due to absence of locks. // Should be always single threaded due to absence of locks.
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled"))); scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled")));
@ -168,8 +168,8 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@PreDestroy @PreDestroy
public void stop() { public void stop() {
if (dbCallbackExecutorService != null) { if (deviceStateExecutor != null) {
dbCallbackExecutorService.shutdownNow(); deviceStateExecutor.shutdownNow();
} }
if (scheduledExecutor != null) { if (scheduledExecutor != null) {
scheduledExecutor.shutdownNow(); scheduledExecutor.shutdownNow();
@ -193,13 +193,13 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@Override @Override
public void onDeviceConnect(DeviceId deviceId) { public void onDeviceConnect(DeviceId deviceId) {
log.trace("on Device Connect [{}]", deviceId.getId());
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) { long ts = System.currentTimeMillis();
long ts = System.currentTimeMillis(); stateData.getState().setLastConnectTime(ts);
stateData.getState().setLastConnectTime(ts); save(deviceId, LAST_CONNECT_TIME, ts);
pushRuleEngineMessage(stateData, CONNECT_EVENT); pushRuleEngineMessage(stateData, CONNECT_EVENT);
save(deviceId, LAST_CONNECT_TIME, ts); checkAndUpdateState(deviceId, stateData);
}
} }
@Override @Override
@ -232,32 +232,21 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@Override @Override
public void onDeviceDisconnect(DeviceId deviceId) { public void onDeviceDisconnect(DeviceId deviceId) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) { long ts = System.currentTimeMillis();
long ts = System.currentTimeMillis(); stateData.getState().setLastDisconnectTime(ts);
stateData.getState().setLastDisconnectTime(ts); save(deviceId, LAST_DISCONNECT_TIME, ts);
pushRuleEngineMessage(stateData, DISCONNECT_EVENT); pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
save(deviceId, LAST_DISCONNECT_TIME, ts);
}
} }
@Override @Override
public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) { public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
if (inactivityTimeout == 0L) { if (inactivityTimeout <= 0L) {
return; return;
} }
log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout); log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout);
DeviceStateData stateData = deviceStates.get(deviceId); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
log.trace("on Device Activity Timeout Update - fetched state {} for device {}", stateData, deviceId); stateData.getState().setInactivityTimeout(inactivityTimeout);
if (stateData != null) { checkAndUpdateState(deviceId, stateData);
long ts = System.currentTimeMillis();
DeviceState state = stateData.getState();
state.setInactivityTimeout(inactivityTimeout);
boolean oldActive = state.isActive();
state.setActive(isActive(ts, state));
if (!oldActive && state.isActive() || oldActive && !state.isActive()) {
save(deviceId, ACTIVITY_STATE, state.isActive());
}
}
} }
@Override @Override
@ -291,7 +280,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
log.warn("Failed to register device to the state service", t); log.warn("Failed to register device to the state service", t);
callback.onFailure(t); callback.onFailure(t);
} }
}, dbCallbackExecutorService); }, deviceStateExecutor);
} else if (proto.getUpdated()) { } else if (proto.getUpdated()) {
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId()); DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
if (stateData != null) { if (stateData != null) {
@ -396,51 +385,69 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
List<Tenant> tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); List<Tenant> tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData();
for (Tenant tenant : tenants) { for (Tenant tenant : tenants) {
log.debug("Finding devices for tenant [{}]", tenant.getName()); log.debug("Finding devices for tenant [{}]", tenant.getName());
PageLink pageLink = new PageLink(initFetchPackSize); final PageLink pageLink = new PageLink(initFetchPackSize);
while (pageLink != null) { scheduledExecutor.submit(() -> submitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor));
List<ListenableFuture<Void>> fetchFutures = new ArrayList<>();
PageData<Device> page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink);
pageLink = page.hasNext() ? pageLink.nextPageLink() : null;
for (Device device : page.getData()) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId());
if (addedPartitions.contains(tpi)) {
log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi);
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<DeviceStateData, Void>() {
@Nullable
@Override
public Void apply(@Nullable DeviceStateData state) {
if (state != null) {
addDeviceUsingState(tpi, state);
if (state.getState().isActive()) {
updateInactivityStateIfExpired(System.currentTimeMillis(), device.getId(), state);
} else {
//trying to fix activity state
if (isActive(System.currentTimeMillis(), state.getState())) {
updateActivityState(device.getId(), state, state.getState().getLastActivityTime());
}
}
} else {
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId());
}
return null;
}
}, dbCallbackExecutorService);
fetchFutures.add(future);
} else {
log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi);
}
}
try {
Futures.successfulAsList(fetchFutures).get();
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to init device state service from DB", e);
}
}
} }
return true; return true;
} }
private void submitNextPage(final Set<TopicPartitionInfo> addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) {
List<ListenableFuture<Void>> fetchFutures = new ArrayList<>();
PageData<Device> page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink);
for (Device device : page.getData()) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId());
if (addedPartitions.contains(tpi)) {
log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi);
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<DeviceStateData, Void>() {
@Nullable
@Override
public Void apply(@Nullable DeviceStateData state) {
if (state != null) {
addDeviceUsingState(tpi, state);
checkAndUpdateState(device.getId(), state);
} else {
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId());
}
return null;
}
}, deviceStateExecutor);
fetchFutures.add(future);
} else {
log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi);
}
}
Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback<List<Void>>() {
@Override
public void onSuccess(List<Void> result) {
log.trace("[{}] Success init device state from DB for batch size {}", tenant.getId(), result.size());
}
@Override
public void onFailure(Throwable t) {
log.warn("[" + tenant.getId() + "] Failed to init device state service from DB", t);
log.warn("[{}] Failed to init device state service from DB", tenant.getId(), t);
}
}, deviceStateExecutor);
final PageLink nextPageLink = page.hasNext() ? pageLink.nextPageLink() : null;
if (nextPageLink != null) {
executor.submit(() -> submitNextPage(addedPartitions, tenant, nextPageLink, executor));
}
}
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
if (state.getState().isActive()) {
updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, state);
} else {
//trying to fix activity state
if (isActive(System.currentTimeMillis(), state.getState())) {
updateActivityState(deviceId, state, state.getState().getLastActivityTime());
}
}
}
private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) { private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
partitionedDevices.computeIfAbsent(tpi, id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId()); partitionedDevices.computeIfAbsent(tpi, id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId());
deviceStates.put(state.getDeviceId(), state); deviceStates.put(state.getDeviceId(), state);
@ -467,9 +474,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
if (!isActive(ts, state) && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) { if (!isActive(ts, state) && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
state.setActive(false); state.setActive(false);
state.setLastInactivityAlarmTime(ts); state.setLastInactivityAlarmTime(ts);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
save(deviceId, INACTIVITY_ALARM_TIME, ts); save(deviceId, INACTIVITY_ALARM_TIME, ts);
save(deviceId, ACTIVITY_STATE, false); save(deviceId, ACTIVITY_STATE, false);
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
} }
} else { } else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId); log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
@ -484,7 +491,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@Nonnull @Nonnull
private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) { private DeviceStateData getOrFetchDeviceStateData(DeviceId deviceId) {
DeviceStateData deviceStateData = deviceStates.get(deviceId); DeviceStateData deviceStateData = getOrFetchDeviceStateData(deviceId);
if (deviceStateData != null) { if (deviceStateData != null) {
return deviceStateData; return deviceStateData;
} }
@ -530,10 +537,10 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
ListenableFuture<DeviceStateData> future; ListenableFuture<DeviceStateData> future;
if (persistToTelemetry) { if (persistToTelemetry) {
ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
future = Futures.transform(tsData, extractDeviceStateData(device), dbCallbackExecutorService); future = Futures.transform(tsData, extractDeviceStateData(device), deviceStateExecutor);
} else { } else {
ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
future = Futures.transform(attrData, extractDeviceStateData(device), dbCallbackExecutorService); future = Futures.transform(attrData, extractDeviceStateData(device), deviceStateExecutor);
} }
return transformInactivityTimeout(future); return transformInactivityTimeout(future);
} }
@ -563,9 +570,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
resultFuture.setException(t); resultFuture.setException(t);
} }
}, },
dbCallbackExecutorService); deviceStateExecutor);
return resultFuture; return resultFuture;
}, dbCallbackExecutorService); }, deviceStateExecutor);
} }
private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) { private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {