Refactor SNMP devices' sessions establishing

This commit is contained in:
Viacheslav Klimov 2021-04-29 11:12:53 +03:00 committed by Andrew Shvayka
parent 3b74a806bc
commit 1de97ad0e1
2 changed files with 20 additions and 35 deletions

View File

@ -55,7 +55,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;
@TbSnmpTransportComponent
@Component
@ -72,25 +71,30 @@ public class SnmpTransportContext extends TransportContext {
private final SnmpAuthService snmpAuthService;
private final Map<DeviceId, DeviceSessionContext> sessions = new ConcurrentHashMap<>();
private Collection<DeviceId> allSnmpDevicesIds = new ConcurrentLinkedDeque<>();
private final Collection<DeviceId> allSnmpDevicesIds = new ConcurrentLinkedDeque<>();
@AfterStartUp(order = 2)
public void initDevicesSessions() {
public void fetchDevicesAndEstablishSessions() {
log.info("Initializing SNMP devices sessions");
allSnmpDevicesIds = protoEntityService.getAllSnmpDevicesIds().stream()
.map(DeviceId::new)
.collect(Collectors.toList());
log.trace("Found all SNMP devices ids: {}", allSnmpDevicesIds);
List<DeviceId> managedDevicesIds = allSnmpDevicesIds.stream()
.filter(deviceId -> balancingService.isManagedByCurrentTransport(deviceId.getId()))
.collect(Collectors.toList());
log.info("SNMP devices managed by current SNMP transport: {}", managedDevicesIds);
int batchIndex = 0;
int batchSize = 512;
boolean nextBatchExists = true;
managedDevicesIds.stream()
.map(protoEntityService::getDeviceById)
.collect(Collectors.toList())
.forEach(this::establishDeviceSession);
while (nextBatchExists) {
TransportProtos.GetSnmpDevicesResponseMsg snmpDevicesResponse = protoEntityService.getSnmpDevicesIds(batchIndex, batchSize);
snmpDevicesResponse.getIdsList().stream()
.map(id -> new DeviceId(UUID.fromString(id)))
.peek(allSnmpDevicesIds::add)
.filter(deviceId -> balancingService.isManagedByCurrentTransport(deviceId.getId()))
.map(protoEntityService::getDeviceById)
.forEach(device -> getExecutor().execute(() -> establishDeviceSession(device)));
nextBatchExists = snmpDevicesResponse.getHasNextPage();
batchIndex++;
}
log.debug("Found all SNMP devices ids: {}", allSnmpDevicesIds);
}
private void establishDeviceSession(Device device) {

View File

@ -81,26 +81,7 @@ public class ProtoTransportEntityService {
.orElseThrow(() -> new IllegalArgumentException("Device credentials not found"));
}
public List<UUID> getAllSnmpDevicesIds() {
List<UUID> result = new ArrayList<>();
int page = 0;
int pageSize = 512;
boolean hasNextPage = true;
while (hasNextPage) {
TransportProtos.GetSnmpDevicesResponseMsg responseMsg = requestSnmpDevicesIds(page, pageSize);
result.addAll(responseMsg.getIdsList().stream()
.map(UUID::fromString)
.collect(Collectors.toList()));
hasNextPage = responseMsg.getHasNextPage();
page++;
}
return result;
}
private TransportProtos.GetSnmpDevicesResponseMsg requestSnmpDevicesIds(int page, int pageSize) {
public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(int page, int pageSize) {
TransportProtos.GetSnmpDevicesRequestMsg requestMsg = TransportProtos.GetSnmpDevicesRequestMsg.newBuilder()
.setPage(page)
.setPageSize(pageSize)