Merge pull request #8449 from YevhenBondarenko/fix/device-state

fixed using default timeout and improvements
This commit is contained in:
Andrew Shvayka 2023-05-02 13:28:41 +03:00 committed by GitHub
commit 15ce59b64f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -86,6 +86,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -294,7 +295,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
return; return;
} }
if (inactivityTimeout <= 0L) { if (inactivityTimeout <= 0L) {
inactivityTimeout = defaultInactivityTimeoutInSec; inactivityTimeout = defaultInactivityTimeoutMs;
} }
log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout); log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout);
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
@ -442,6 +443,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
Map<TenantId, Pair<AtomicInteger, AtomicInteger>> devicesActivity = new HashMap<>(); Map<TenantId, Pair<AtomicInteger, AtomicInteger>> devicesActivity = new HashMap<>();
partitionedEntities.forEach((tpi, deviceIds) -> { partitionedEntities.forEach((tpi, deviceIds) -> {
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size()); log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
Set<DeviceId> idsFromRemovedTenant = new HashSet<>();
for (DeviceId deviceId : deviceIds) { for (DeviceId deviceId : deviceIds) {
DeviceStateData stateData; DeviceStateData stateData;
try { try {
@ -453,9 +455,11 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
try { try {
updateInactivityStateIfExpired(ts, deviceId, stateData); updateInactivityStateIfExpired(ts, deviceId, stateData);
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}] Failed to update inactivity state", deviceId, e);
if (e instanceof TenantNotFoundException) { if (e instanceof TenantNotFoundException) {
idsFromRemovedTenant.add(deviceId);
continue; continue;
} else {
log.warn("[{}] Failed to update inactivity state [{}]", deviceId, e.getMessage());
} }
} }
Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(), Pair<AtomicInteger, AtomicInteger> tenantDevicesActivity = devicesActivity.computeIfAbsent(stateData.getTenantId(),
@ -466,6 +470,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
tenantDevicesActivity.getRight().incrementAndGet(); tenantDevicesActivity.getRight().incrementAndGet();
} }
} }
deviceIds.removeAll(idsFromRemovedTenant);
}); });
devicesActivity.forEach((tenantId, tenantDevicesActivity) -> { devicesActivity.forEach((tenantId, tenantDevicesActivity) -> {
int active = tenantDevicesActivity.getLeft().get(); int active = tenantDevicesActivity.getLeft().get();