Improvement to Device State Service

This commit is contained in:
Andrii Shvaika 2022-08-04 16:18:15 +03:00
parent 2c8b938540
commit ad590da509
2 changed files with 30 additions and 15 deletions

View File

@ -126,7 +126,7 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
}
List<ListenableFuture<?>> fetchTasks = partitionedFetchTasks.remove(partition);
if (fetchTasks != null) {
fetchTasks.forEach(f -> f.cancel(true));
fetchTasks.forEach(f -> f.cancel(false));
}
partitionListChanged = true;
}

View File

@ -307,8 +307,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Override
public void onSuccess(@Nullable DeviceStateData state) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
if (partitionedEntities.containsKey(tpi)) {
addDeviceUsingState(tpi, state);
if (addDeviceUsingState(tpi, state)) {
save(deviceId, ACTIVITY_STATE, false);
callback.onSuccess();
} else {
@ -361,25 +360,40 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
// hard-coded limit of 1000 is due to the Entity Data Query limitations and should not be changed.
for (List<DeviceIdInfo> partition : Lists.partition(entry.getValue(), 1000)) {
log.info("[{}] Submit task for device states: {}", entry.getKey(), partition.size());
DevicePackFutureHolder devicePackFutureHolder = new DevicePackFutureHolder();
var devicePackFuture = deviceStateExecutor.submit(() -> {
List<DeviceStateData> states;
if (persistToTelemetry && !dbTypeInfoComponent.isLatestTsDaoStoredToSql()) {
states = fetchDeviceStateDataUsingSeparateRequests(partition);
} else {
states = fetchDeviceStateDataUsingEntityDataQuery(partition);
try {
List<DeviceStateData> states;
if (persistToTelemetry && !dbTypeInfoComponent.isLatestTsDaoStoredToSql()) {
states = fetchDeviceStateDataUsingSeparateRequests(partition);
} else {
states = fetchDeviceStateDataUsingEntityDataQuery(partition);
}
if (devicePackFutureHolder.future != null && !devicePackFutureHolder.future.isCancelled()) {
for (var state : states) {
if (!addDeviceUsingState(entry.getKey(), state)) {
return;
}
checkAndUpdateState(state.getDeviceId(), state);
}
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
}
} catch (Throwable t) {
log.error("Unexpected exception while device pack fetching", t);
throw t;
}
for (var state : states) {
addDeviceUsingState(entry.getKey(), state);
checkAndUpdateState(state.getDeviceId(), state);
}
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
});
devicePackFutureHolder.future = devicePackFuture;
result.computeIfAbsent(entry.getKey(), tmp -> new ArrayList<>()).add(devicePackFuture);
}
}
return result;
}
private static class DevicePackFutureHolder {
private volatile ListenableFuture<?> future;
}
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
if (state.getState().isActive()) {
updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, state);
@ -391,14 +405,15 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
}
}
private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
private boolean addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
if (deviceIds != null) {
deviceIds.add(state.getDeviceId());
deviceStates.putIfAbsent(state.getDeviceId(), state);
return true;
} else {
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
throw new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!");
return false;
}
}