diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 712e0e1a91..a71a638974 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -70,12 +70,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -129,9 +131,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener> partitionedDevices = new ConcurrentHashMap<>(); private final ConcurrentMap deviceStates = new ConcurrentHashMap<>(); - private final ConcurrentMap deviceLastReportedActivity = new ConcurrentHashMap<>(); private final ConcurrentMap deviceLastSavedActivity = new ConcurrentHashMap<>(); private volatile EventDeduplicationExecutor> deduplicationExecutor; @@ -154,14 +156,19 @@ public class DefaultDeviceStateService extends TbApplicationEventListener(DefaultDeviceStateService.class.getSimpleName(), queueExecutor, this::initStateFromDB); } @PreDestroy public void stop() { + if (executorService != null) { + executorService.shutdownNow(); + } if (queueExecutor != null) { queueExecutor.shutdownNow(); } @@ -195,10 +202,11 @@ public class DefaultDeviceStateService extends TbApplicationEventListener 0 && lastReportedActivity > lastSavedActivity) { DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); + log.trace("on Device Activity - fetched state {} for device {}", stateData, deviceId); if (stateData != null) { save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity); deviceLastSavedActivity.put(deviceId, lastReportedActivity); @@ -229,7 +237,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener devices = partitionedDevices.remove(partition); devices.forEach(deviceId -> { deviceStates.remove(deviceId); - deviceLastReportedActivity.remove(deviceId); deviceLastSavedActivity.remove(deviceId); }); }); @@ -349,7 +358,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener deviceIdSet = partitionedDevices.get(tpi); @@ -439,10 +447,10 @@ public class DefaultDeviceStateService extends TbApplicationEventListener fetchDeviceState(Device device) { if (persistToTelemetry) { ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); - return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor()); + return Futures.transform(tsData, extractDeviceStateData(device), executorService); } else { ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); - return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor()); + return Futures.transform(attrData, extractDeviceStateData(device), executorService); } } @@ -467,13 +475,15 @@ public class DefaultDeviceStateService extends TbApplicationEventListener