Additional logging for device state services

This commit is contained in:
Volodymyr Babak 2021-05-13 17:39:44 +03:00 committed by Andrew Shvayka
parent e430deceac
commit a96f2d444c

View File

@ -70,12 +70,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -129,9 +131,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
private int initFetchPackSize;
private ListeningScheduledExecutorService queueExecutor;
private ExecutorService executorService;
private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, Long> deviceLastReportedActivity = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>();
private volatile EventDeduplicationExecutor<Set<TopicPartitionInfo>> deduplicationExecutor;
@ -154,14 +156,19 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state"));
// Should be always single threaded due to absence of locks.
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state")));
queueExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled")));
queueExecutor.scheduleAtFixedRate(this::updateState, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
deduplicationExecutor = new EventDeduplicationExecutor<>(DefaultDeviceStateService.class.getSimpleName(), queueExecutor, this::initStateFromDB);
}
@PreDestroy
public void stop() {
if (executorService != null) {
executorService.shutdownNow();
}
if (queueExecutor != null) {
queueExecutor.shutdownNow();
}
@ -195,10 +202,11 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
@Override
public void onDeviceActivity(DeviceId deviceId, long lastReportedActivity) {
deviceLastReportedActivity.put(deviceId, lastReportedActivity);
log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity);
long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L);
if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
log.trace("on Device Activity - fetched state {} for device {}", stateData, deviceId);
if (stateData != null) {
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
deviceLastSavedActivity.put(deviceId, lastReportedActivity);
@ -229,7 +237,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
if (inactivityTimeout == 0L) {
return;
}
log.trace("on Device Activity Timeout Update device id {} inactivityTimeout {}", deviceId, inactivityTimeout);
DeviceStateData stateData = deviceStates.get(deviceId);
log.trace("on Device Activity Timeout Update - fetched state {} for device {}", stateData, deviceId);
if (stateData != null) {
long ts = System.currentTimeMillis();
DeviceState state = stateData.getState();
@ -273,7 +283,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
log.warn("Failed to register device to the state service", t);
callback.onFailure(t);
}
}, MoreExecutors.directExecutor());
}, executorService);
} else if (proto.getUpdated()) {
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
if (stateData != null) {
@ -321,7 +331,6 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
Set<DeviceId> devices = partitionedDevices.remove(partition);
devices.forEach(deviceId -> {
deviceStates.remove(deviceId);
deviceLastReportedActivity.remove(deviceId);
deviceLastSavedActivity.remove(deviceId);
});
});
@ -349,7 +358,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
}
return null;
}
}, MoreExecutors.directExecutor());
}, executorService);
fetchFutures.add(future);
}
}
@ -380,6 +389,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
log.debug("Calculating state updates for {} devices", deviceStates.size());
for (DeviceId deviceId : deviceIds) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
log.trace("Processing state {} for device {}", stateData, deviceId);
if (stateData != null) {
DeviceState state = stateData.getState();
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
@ -392,7 +402,6 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
} else {
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
deviceStates.remove(deviceId);
deviceLastReportedActivity.remove(deviceId);
deviceLastSavedActivity.remove(deviceId);
}
}
@ -429,7 +438,6 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
deviceStates.remove(deviceId);
deviceLastReportedActivity.remove(deviceId);
deviceLastSavedActivity.remove(deviceId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
Set<DeviceId> deviceIdSet = partitionedDevices.get(tpi);
@ -439,10 +447,10 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
if (persistToTelemetry) {
ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
return Futures.transform(tsData, extractDeviceStateData(device), executorService);
} else {
ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
return Futures.transform(attrData, extractDeviceStateData(device), executorService);
}
}
@ -467,13 +475,15 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
TbMsgMetaData md = new TbMsgMetaData();
md.putValue("deviceName", device.getName());
md.putValue("deviceType", device.getType());
return DeviceStateData.builder()
DeviceStateData.builder()
.customerId(device.getCustomerId())
.tenantId(device.getTenantId())
.deviceId(device.getId())
.deviceCreationTime(device.getCreatedTime())
.metaData(md)
.state(deviceState).build();
log.trace("[{}] Fetched device state from the DB {}", device.getId(), deviceStateData);
return deviceStateData;
} catch (Exception e) {
log.warn("[{}] Failed to fetch device state data", device.getId(), e);
throw new RuntimeException(e);