From 133eddb8553a1c7825d2c35fd5da8d1c391d0503 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 17 May 2021 14:30:22 +0300 Subject: [PATCH] default state service: refactored events order --- .../state/DefaultDeviceStateService.java | 159 +++++++++--------- 1 file changed, 83 insertions(+), 76 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index d7a92378f7..286260cf8e 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -134,7 +134,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener> partitionedDevices = new ConcurrentHashMap<>(); private final ConcurrentMap deviceStates = new ConcurrentHashMap<>(); private final ConcurrentMap deviceLastSavedActivity = new ConcurrentHashMap<>(); @@ -159,7 +159,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); for (Tenant tenant : tenants) { log.debug("Finding devices for tenant [{}]", tenant.getName()); - PageLink pageLink = new PageLink(initFetchPackSize); - while (pageLink != null) { - List> fetchFutures = new ArrayList<>(); - PageData page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); - pageLink = page.hasNext() ? pageLink.nextPageLink() : null; - for (Device device : page.getData()) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); - if (addedPartitions.contains(tpi)) { - log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); - ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function() { - @Nullable - @Override - public Void apply(@Nullable DeviceStateData state) { - if (state != null) { - addDeviceUsingState(tpi, state); + final PageLink pageLink = new PageLink(initFetchPackSize); + scheduledExecutor.submit(() -> submitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor)); - if (state.getState().isActive()) { - updateInactivityStateIfExpired(System.currentTimeMillis(), device.getId(), state); - } else { - //trying to fix activity state - if (isActive(System.currentTimeMillis(), state.getState())) { - updateActivityState(device.getId(), state, state.getState().getLastActivityTime()); - } - } - } else { - log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); - } - return null; - } - }, dbCallbackExecutorService); - fetchFutures.add(future); - } else { - log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); - } - } - try { - Futures.successfulAsList(fetchFutures).get(); - } catch (InterruptedException | ExecutionException e) { - log.warn("Failed to init device state service from DB", e); - } - } } return true; } + private void submitNextPage(final Set addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) { + List> fetchFutures = new ArrayList<>(); + PageData page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); + for (Device device : page.getData()) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); + if (addedPartitions.contains(tpi)) { + log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); + ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function() { + @Nullable + @Override + public Void apply(@Nullable DeviceStateData state) { + if (state != null) { + addDeviceUsingState(tpi, state); + checkAndUpdateState(device.getId(), state); + } else { + log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); + } + return null; + } + }, deviceStateExecutor); + fetchFutures.add(future); + } else { + log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); + } + } + + Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback>() { + @Override + public void onSuccess(List result) { + log.trace("[{}] Success init device state from DB for batch size {}", tenant.getId(), result.size()); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[" + tenant.getId() + "] Failed to init device state service from DB", t); + log.warn("[{}] Failed to init device state service from DB", tenant.getId(), t); + } + }, deviceStateExecutor); + + final PageLink nextPageLink = page.hasNext() ? pageLink.nextPageLink() : null; + if (nextPageLink != null) { + executor.submit(() -> submitNextPage(addedPartitions, tenant, nextPageLink, executor)); + } + } + + void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) { + if (state.getState().isActive()) { + updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, state); + } else { + //trying to fix activity state + if (isActive(System.currentTimeMillis(), state.getState())) { + updateActivityState(deviceId, state, state.getState().getLastActivityTime()); + } + } + } + private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) { partitionedDevices.computeIfAbsent(tpi, id -> ConcurrentHashMap.newKeySet()).add(state.getDeviceId()); deviceStates.put(state.getDeviceId(), state); @@ -467,9 +474,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener future; if (persistToTelemetry) { ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); - future = Futures.transform(tsData, extractDeviceStateData(device), dbCallbackExecutorService); + future = Futures.transform(tsData, extractDeviceStateData(device), deviceStateExecutor); } else { ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); - future = Futures.transform(attrData, extractDeviceStateData(device), dbCallbackExecutorService); + future = Futures.transform(attrData, extractDeviceStateData(device), deviceStateExecutor); } return transformInactivityTimeout(future); } @@ -563,9 +570,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener Function, DeviceStateData> extractDeviceStateData(Device device) {