Send large SNMP requests in batches

This commit is contained in:
ViacheslavKlimov 2023-06-12 21:09:57 +03:00
parent 7675a57316
commit 42183a0e64
8 changed files with 75 additions and 80 deletions

View File

@ -959,6 +959,7 @@ transport:
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
# to configure SNMP to work over UDP or TCP
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}"
stats:
enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"
print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}"

View File

@ -178,17 +178,10 @@ public class SnmpTransportContext extends TransportContext {
@Override
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
if (msg.hasDeviceInfo()) {
SessionInfoProto sessionInfo = SessionInfoCreator.create(
msg, SnmpTransportContext.this, UUID.randomUUID()
);
transportService.registerAsyncSession(sessionInfo, deviceSessionContext);
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), TransportServiceCallback.EMPTY);
transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), TransportServiceCallback.EMPTY);
deviceSessionContext.setSessionInfo(sessionInfo);
deviceSessionContext.setDeviceInfo(msg.getDeviceInfo());
deviceSessionContext.setConnected(true);
registerTransportSession(deviceSessionContext, msg);
deviceSessionContext.setSessionTimeoutHandler(() -> {
registerTransportSession(deviceSessionContext, msg);
});
} else {
log.warn("[{}] Failed to process device auth", deviceSessionContext.getDeviceId());
}
@ -201,6 +194,21 @@ public class SnmpTransportContext extends TransportContext {
});
}
private void registerTransportSession(DeviceSessionContext deviceSessionContext, ValidateDeviceCredentialsResponse msg) {
SessionInfoProto sessionInfo = SessionInfoCreator.create(
msg, SnmpTransportContext.this, UUID.randomUUID()
);
log.debug("Registering transport session: {}", sessionInfo);
transportService.registerAsyncSession(sessionInfo, deviceSessionContext);
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), TransportServiceCallback.EMPTY);
transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), TransportServiceCallback.EMPTY);
deviceSessionContext.setSessionInfo(sessionInfo);
deviceSessionContext.setDeviceInfo(msg.getDeviceInfo());
deviceSessionContext.setConnected(true);
}
@EventListener(DeviceUpdatedEvent.class)
public void onDeviceUpdatedOrCreated(DeviceUpdatedEvent deviceUpdatedEvent) {
Device device = deviceUpdatedEvent.getDevice();

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.transport.snmp.service;
import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.snmp4j.PDU;
@ -25,6 +26,7 @@ import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.Variable;
import org.snmp4j.smi.VariableBinding;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.kv.DataType;
@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicatio
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -47,21 +50,30 @@ import java.util.stream.IntStream;
@Service
@Slf4j
public class PduService {
public PDU createPdu(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
PDU pdu = setUpPdu(sessionContext);
pdu.setType(communicationConfig.getMethod().getCode());
pdu.addAll(communicationConfig.getAllMappings().stream()
.filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey()))
.map(mapping -> Optional.ofNullable(values.get(mapping.getKey()))
.map(value -> {
Variable variable = toSnmpVariable(value, mapping.getDataType());
return new VariableBinding(new OID(mapping.getOid()), variable);
})
.orElseGet(() -> new VariableBinding(new OID(mapping.getOid()))))
.collect(Collectors.toList()));
@Value("${transport.snmp.max_request_oids:100}")
private int maxRequestOids;
return pdu;
public List<PDU> createPdus(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
List<PDU> pdus = new ArrayList<>();
List<SnmpMapping> allMappings = communicationConfig.getAllMappings();
for (List<SnmpMapping> mappings : Lists.partition(allMappings, maxRequestOids)) {
PDU pdu = setUpPdu(sessionContext);
pdu.setType(communicationConfig.getMethod().getCode());
pdu.addAll(mappings.stream()
.filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey()))
.map(mapping -> Optional.ofNullable(values.get(mapping.getKey()))
.map(value -> {
Variable variable = toSnmpVariable(value, mapping.getDataType());
return new VariableBinding(new OID(mapping.getOid()), variable);
})
.orElseGet(() -> new VariableBinding(new OID(mapping.getOid()))))
.collect(Collectors.toList()));
pdus.add(pdu);
}
return pdus;
}
public PDU createSingleVariablePdu(DeviceSessionContext sessionContext, SnmpMethod snmpMethod, String oid, String value, DataType dataType) {

View File

@ -45,7 +45,6 @@ import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
@ -161,18 +160,20 @@ public class SnmpTransportService implements TbTransportService {
}
private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
PDU request = pduService.createPdu(sessionContext, communicationConfig, values);
List<PDU> request = pduService.createPdus(sessionContext, communicationConfig, values);
RequestInfo requestInfo = new RequestInfo(communicationConfig.getSpec(), communicationConfig.getAllMappings());
sendRequest(sessionContext, request, requestInfo);
}
private void sendRequest(DeviceSessionContext sessionContext, PDU request, RequestInfo requestInfo) {
if (request.size() > 0) {
log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), request.getVariableBindings());
try {
snmp.send(request, sessionContext.getTarget(), requestInfo, sessionContext);
} catch (IOException e) {
log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString());
private void sendRequest(DeviceSessionContext sessionContext, List<PDU> request, RequestInfo requestInfo) {
for (PDU pdu : request) {
if (pdu.size() > 0) {
log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), pdu.getVariableBindings());
try {
snmp.send(pdu, sessionContext.getTarget(), requestInfo, sessionContext);
} catch (IOException e) {
log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString());
}
}
}
}
@ -216,7 +217,7 @@ public class SnmpTransportService implements TbTransportService {
PDU request = pduService.createSingleVariablePdu(sessionContext, snmpMethod, oid, value, dataType);
RequestInfo requestInfo = new RequestInfo(toDeviceRpcRequestMsg.getRequestId(), communicationConfig.getSpec(), communicationConfig.getAllMappings());
sendRequest(sessionContext, request, requestInfo);
sendRequest(sessionContext, List.of(request), requestInfo);
}
@ -247,7 +248,7 @@ public class SnmpTransportService implements TbTransportService {
JsonObject responseData = responseDataMappers.get(requestInfo.getCommunicationSpec()).map(response, requestInfo);
if (responseData.entrySet().isEmpty()) {
log.debug("No values is the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID());
log.debug("No values in the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID());
return;
}
@ -302,11 +303,7 @@ public class SnmpTransportService implements TbTransportService {
}
private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
transportService.process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(true)
.setRpcSubscription(true)
.setLastActivityTime(System.currentTimeMillis())
.build(), TransportServiceCallback.EMPTY);
transportService.reportActivity(sessionInfo);
}

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
@ -63,6 +64,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
private final AtomicInteger msgIdSeq = new AtomicInteger(0);
@Getter
private boolean isActive = true;
@Setter
private Runnable sessionTimeoutHandler;
@Getter
private final List<ScheduledFuture<?>> queryingTasks = new LinkedList<>();
@ -137,6 +140,11 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override
public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
if (sessionCloseNotification.getMessage().equals(DefaultTransportService.SESSION_EXPIRED_MESSAGE)) {
if (sessionTimeoutHandler != null) {
sessionTimeoutHandler.run();
}
}
}
@Override

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.transport.snmp;
import org.snmp4j.CommandResponderEvent;
import org.snmp4j.CommunityTarget;
import org.snmp4j.PDU;
import org.snmp4j.Snmp;
@ -35,7 +34,6 @@ import org.snmp4j.agent.mo.snmp.SnmpTargetMIB;
import org.snmp4j.agent.mo.snmp.StorageType;
import org.snmp4j.agent.mo.snmp.VacmMIB;
import org.snmp4j.agent.security.MutableVACM;
import org.snmp4j.mp.MPv3;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.security.SecurityLevel;
import org.snmp4j.security.SecurityModel;
@ -53,27 +51,11 @@ import org.snmp4j.transport.TransportMappings;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@SuppressWarnings("deprecation")
public class SnmpDeviceSimulatorV2 extends BaseAgent {
public static class RequestProcessor extends CommandProcessor {
private final Consumer<CommandResponderEvent> processor;
public RequestProcessor(Consumer<CommandResponderEvent> processor) {
super(new OctetString(MPv3.createLocalEngineID()));
this.processor = processor;
}
@Override
public void processPdu(CommandResponderEvent event) {
processor.accept(event);
}
}
private final Target target;
private final Address address;
private Snmp snmp;
@ -81,10 +63,7 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent {
private final String password;
public SnmpDeviceSimulatorV2(int port, String password) throws IOException {
super(new File("conf.agent"), new File("bootCounter.agent"), new RequestProcessor(event -> {
System.out.println("aboba");
((Snmp) event.getSource()).cancel(event.getPDU(), event1 -> System.out.println("canceled"));
}));
super(new File("conf.agent"), new File("bootCounter.agent"), new CommandProcessor(new OctetString("12312")));
CommunityTarget target = new CommunityTarget();
target.setCommunity(new OctetString(password));
this.address = GenericAddress.parse("udp:0.0.0.0/" + port);

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.snmp;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
@ -24,24 +25,12 @@ public class SnmpTestV2 {
SnmpDeviceSimulatorV2 device = new SnmpDeviceSimulatorV2(1610, "public");
device.start();
device.setUpMappings(Map.of(
".1.3.6.1.2.1.1.1.50", "12",
".1.3.6.1.2.1.2.1.52", "56",
".1.3.6.1.2.1.3.1.54", "yes",
".1.3.6.1.2.1.7.1.58", ""
));
// while (true) {
// new Scanner(System.in).nextLine();
// device.sendTrap("127.0.0.1", 1062, Map.of(".1.3.6.1.2.87.1.56", "12"));
// System.out.println("sent");
// }
// Snmp snmp = new Snmp(device.transportMappings[0]);
// device.snmp.addCommandResponder(event -> {
// System.out.println(event);
// });
Map<String, String> mappings = new HashMap<>();
for (int i = 1; i <= 500; i++) {
String oid = String.format(".1.3.6.1.2.1.%s.1.52", i);
mappings.put(oid, "value_" + i);
}
device.setUpMappings(mappings);
new Scanner(System.in).nextLine();
}

View File

@ -93,6 +93,7 @@ transport:
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
# to configure SNMP to work over UDP or TCP
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}"
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"