From 1de97ad0e1586d3917755cc257ff0bbf8d5e43e7 Mon Sep 17 00:00:00 2001 From: Viacheslav Klimov Date: Thu, 29 Apr 2021 11:12:53 +0300 Subject: [PATCH] Refactor SNMP devices' sessions establishing --- .../transport/snmp/SnmpTransportContext.java | 34 +++++++++++-------- .../service/ProtoTransportEntityService.java | 21 +----------- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java index 68d4dd3933..0efde4d893 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java @@ -55,7 +55,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.stream.Collectors; @TbSnmpTransportComponent @Component @@ -72,25 +71,30 @@ public class SnmpTransportContext extends TransportContext { private final SnmpAuthService snmpAuthService; private final Map sessions = new ConcurrentHashMap<>(); - private Collection allSnmpDevicesIds = new ConcurrentLinkedDeque<>(); + private final Collection allSnmpDevicesIds = new ConcurrentLinkedDeque<>(); @AfterStartUp(order = 2) - public void initDevicesSessions() { + public void fetchDevicesAndEstablishSessions() { log.info("Initializing SNMP devices sessions"); - allSnmpDevicesIds = protoEntityService.getAllSnmpDevicesIds().stream() - .map(DeviceId::new) - .collect(Collectors.toList()); - log.trace("Found all SNMP devices ids: {}", allSnmpDevicesIds); - List managedDevicesIds = allSnmpDevicesIds.stream() - .filter(deviceId -> balancingService.isManagedByCurrentTransport(deviceId.getId())) - .collect(Collectors.toList()); - log.info("SNMP devices managed by current SNMP transport: {}", managedDevicesIds); + int batchIndex = 0; + int batchSize = 512; + boolean nextBatchExists = true; - managedDevicesIds.stream() - .map(protoEntityService::getDeviceById) - .collect(Collectors.toList()) - .forEach(this::establishDeviceSession); + while (nextBatchExists) { + TransportProtos.GetSnmpDevicesResponseMsg snmpDevicesResponse = protoEntityService.getSnmpDevicesIds(batchIndex, batchSize); + snmpDevicesResponse.getIdsList().stream() + .map(id -> new DeviceId(UUID.fromString(id))) + .peek(allSnmpDevicesIds::add) + .filter(deviceId -> balancingService.isManagedByCurrentTransport(deviceId.getId())) + .map(protoEntityService::getDeviceById) + .forEach(device -> getExecutor().execute(() -> establishDeviceSession(device))); + + nextBatchExists = snmpDevicesResponse.getHasNextPage(); + batchIndex++; + } + + log.debug("Found all SNMP devices ids: {}", allSnmpDevicesIds); } private void establishDeviceSession(Device device) { diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java index 957d610f91..777650f779 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java @@ -81,26 +81,7 @@ public class ProtoTransportEntityService { .orElseThrow(() -> new IllegalArgumentException("Device credentials not found")); } - public List getAllSnmpDevicesIds() { - List result = new ArrayList<>(); - - int page = 0; - int pageSize = 512; - boolean hasNextPage = true; - - while (hasNextPage) { - TransportProtos.GetSnmpDevicesResponseMsg responseMsg = requestSnmpDevicesIds(page, pageSize); - result.addAll(responseMsg.getIdsList().stream() - .map(UUID::fromString) - .collect(Collectors.toList())); - hasNextPage = responseMsg.getHasNextPage(); - page++; - } - - return result; - } - - private TransportProtos.GetSnmpDevicesResponseMsg requestSnmpDevicesIds(int page, int pageSize) { + public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(int page, int pageSize) { TransportProtos.GetSnmpDevicesRequestMsg requestMsg = TransportProtos.GetSnmpDevicesRequestMsg.newBuilder() .setPage(page) .setPageSize(pageSize)