diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 96e409373e..0dc9ce9d25 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -959,6 +959,7 @@ transport: parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}" # to configure SNMP to work over UDP or TCP underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}" + max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}" stats: enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" print-interval-ms: "${TB_TRANSPORT_STATS_PRINT_INTERVAL_MS:60000}" diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java index 5dcffbf62f..ba410cf960 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java @@ -178,17 +178,10 @@ public class SnmpTransportContext extends TransportContext { @Override public void onSuccess(ValidateDeviceCredentialsResponse msg) { if (msg.hasDeviceInfo()) { - SessionInfoProto sessionInfo = SessionInfoCreator.create( - msg, SnmpTransportContext.this, UUID.randomUUID() - ); - - transportService.registerAsyncSession(sessionInfo, deviceSessionContext); - transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), TransportServiceCallback.EMPTY); - transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), TransportServiceCallback.EMPTY); - - deviceSessionContext.setSessionInfo(sessionInfo); - deviceSessionContext.setDeviceInfo(msg.getDeviceInfo()); - deviceSessionContext.setConnected(true); + registerTransportSession(deviceSessionContext, msg); + deviceSessionContext.setSessionTimeoutHandler(() -> { + registerTransportSession(deviceSessionContext, msg); + }); } else { log.warn("[{}] Failed to process device auth", deviceSessionContext.getDeviceId()); } @@ -201,6 +194,21 @@ public class SnmpTransportContext extends TransportContext { }); } + private void registerTransportSession(DeviceSessionContext deviceSessionContext, ValidateDeviceCredentialsResponse msg) { + SessionInfoProto sessionInfo = SessionInfoCreator.create( + msg, SnmpTransportContext.this, UUID.randomUUID() + ); + log.debug("Registering transport session: {}", sessionInfo); + + transportService.registerAsyncSession(sessionInfo, deviceSessionContext); + transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), TransportServiceCallback.EMPTY); + transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), TransportServiceCallback.EMPTY); + + deviceSessionContext.setSessionInfo(sessionInfo); + deviceSessionContext.setDeviceInfo(msg.getDeviceInfo()); + deviceSessionContext.setConnected(true); + } + @EventListener(DeviceUpdatedEvent.class) public void onDeviceUpdatedOrCreated(DeviceUpdatedEvent deviceUpdatedEvent) { Device device = deviceUpdatedEvent.getDevice(); diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/PduService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/PduService.java index b395672c00..12fab72a6a 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/PduService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/PduService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.transport.snmp.service; +import com.google.common.collect.Lists; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.snmp4j.PDU; @@ -25,6 +26,7 @@ import org.snmp4j.smi.OID; import org.snmp4j.smi.OctetString; import org.snmp4j.smi.Variable; import org.snmp4j.smi.VariableBinding; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; import org.thingsboard.server.common.data.kv.DataType; @@ -35,6 +37,7 @@ import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicatio import org.thingsboard.server.queue.util.TbSnmpTransportComponent; import org.thingsboard.server.transport.snmp.session.DeviceSessionContext; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,21 +50,30 @@ import java.util.stream.IntStream; @Service @Slf4j public class PduService { - public PDU createPdu(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map values) { - PDU pdu = setUpPdu(sessionContext); - pdu.setType(communicationConfig.getMethod().getCode()); - pdu.addAll(communicationConfig.getAllMappings().stream() - .filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey())) - .map(mapping -> Optional.ofNullable(values.get(mapping.getKey())) - .map(value -> { - Variable variable = toSnmpVariable(value, mapping.getDataType()); - return new VariableBinding(new OID(mapping.getOid()), variable); - }) - .orElseGet(() -> new VariableBinding(new OID(mapping.getOid())))) - .collect(Collectors.toList())); + @Value("${transport.snmp.max_request_oids:100}") + private int maxRequestOids; - return pdu; + public List createPdus(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map values) { + List pdus = new ArrayList<>(); + List allMappings = communicationConfig.getAllMappings(); + + for (List mappings : Lists.partition(allMappings, maxRequestOids)) { + PDU pdu = setUpPdu(sessionContext); + pdu.setType(communicationConfig.getMethod().getCode()); + pdu.addAll(mappings.stream() + .filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey())) + .map(mapping -> Optional.ofNullable(values.get(mapping.getKey())) + .map(value -> { + Variable variable = toSnmpVariable(value, mapping.getDataType()); + return new VariableBinding(new OID(mapping.getOid()), variable); + }) + .orElseGet(() -> new VariableBinding(new OID(mapping.getOid())))) + .collect(Collectors.toList())); + pdus.add(pdu); + } + + return pdus; } public PDU createSingleVariablePdu(DeviceSessionContext sessionContext, SnmpMethod snmpMethod, String oid, String value, DataType dataType) { diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 6e68a0bd44..45b0d9ec0a 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -45,7 +45,6 @@ import org.thingsboard.server.common.data.transport.snmp.SnmpMethod; import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig; import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbSnmpTransportComponent; @@ -161,18 +160,20 @@ public class SnmpTransportService implements TbTransportService { } private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map values) { - PDU request = pduService.createPdu(sessionContext, communicationConfig, values); + List request = pduService.createPdus(sessionContext, communicationConfig, values); RequestInfo requestInfo = new RequestInfo(communicationConfig.getSpec(), communicationConfig.getAllMappings()); sendRequest(sessionContext, request, requestInfo); } - private void sendRequest(DeviceSessionContext sessionContext, PDU request, RequestInfo requestInfo) { - if (request.size() > 0) { - log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), request.getVariableBindings()); - try { - snmp.send(request, sessionContext.getTarget(), requestInfo, sessionContext); - } catch (IOException e) { - log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString()); + private void sendRequest(DeviceSessionContext sessionContext, List request, RequestInfo requestInfo) { + for (PDU pdu : request) { + if (pdu.size() > 0) { + log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), pdu.getVariableBindings()); + try { + snmp.send(pdu, sessionContext.getTarget(), requestInfo, sessionContext); + } catch (IOException e) { + log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString()); + } } } } @@ -216,7 +217,7 @@ public class SnmpTransportService implements TbTransportService { PDU request = pduService.createSingleVariablePdu(sessionContext, snmpMethod, oid, value, dataType); RequestInfo requestInfo = new RequestInfo(toDeviceRpcRequestMsg.getRequestId(), communicationConfig.getSpec(), communicationConfig.getAllMappings()); - sendRequest(sessionContext, request, requestInfo); + sendRequest(sessionContext, List.of(request), requestInfo); } @@ -247,7 +248,7 @@ public class SnmpTransportService implements TbTransportService { JsonObject responseData = responseDataMappers.get(requestInfo.getCommunicationSpec()).map(response, requestInfo); if (responseData.entrySet().isEmpty()) { - log.debug("No values is the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID()); + log.debug("No values in the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID()); return; } @@ -302,11 +303,7 @@ public class SnmpTransportService implements TbTransportService { } private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { - transportService.process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() - .setAttributeSubscription(true) - .setRpcSubscription(true) - .setLastActivityTime(System.currentTimeMillis()) - .build(), TransportServiceCallback.EMPTY); + transportService.reportActivity(sessionInfo); } diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java index 2b7c452ca7..fede19824f 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.common.transport.service.DefaultTransportService; import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; @@ -63,6 +64,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S private final AtomicInteger msgIdSeq = new AtomicInteger(0); @Getter private boolean isActive = true; + @Setter + private Runnable sessionTimeoutHandler; @Getter private final List> queryingTasks = new LinkedList<>(); @@ -137,6 +140,11 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @Override public void onRemoteSessionCloseCommand(UUID sessionId, SessionCloseNotificationProto sessionCloseNotification) { log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); + if (sessionCloseNotification.getMessage().equals(DefaultTransportService.SESSION_EXPIRED_MESSAGE)) { + if (sessionTimeoutHandler != null) { + sessionTimeoutHandler.run(); + } + } } @Override diff --git a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java index 5fd7f3bed0..fa49b17f13 100644 --- a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java +++ b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.transport.snmp; -import org.snmp4j.CommandResponderEvent; import org.snmp4j.CommunityTarget; import org.snmp4j.PDU; import org.snmp4j.Snmp; @@ -35,7 +34,6 @@ import org.snmp4j.agent.mo.snmp.SnmpTargetMIB; import org.snmp4j.agent.mo.snmp.StorageType; import org.snmp4j.agent.mo.snmp.VacmMIB; import org.snmp4j.agent.security.MutableVACM; -import org.snmp4j.mp.MPv3; import org.snmp4j.mp.SnmpConstants; import org.snmp4j.security.SecurityLevel; import org.snmp4j.security.SecurityModel; @@ -53,27 +51,11 @@ import org.snmp4j.transport.TransportMappings; import java.io.File; import java.io.IOException; import java.util.Map; -import java.util.function.Consumer; import java.util.stream.Collectors; @SuppressWarnings("deprecation") public class SnmpDeviceSimulatorV2 extends BaseAgent { - public static class RequestProcessor extends CommandProcessor { - private final Consumer processor; - - public RequestProcessor(Consumer processor) { - super(new OctetString(MPv3.createLocalEngineID())); - this.processor = processor; - } - - @Override - public void processPdu(CommandResponderEvent event) { - processor.accept(event); - } - } - - private final Target target; private final Address address; private Snmp snmp; @@ -81,10 +63,7 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent { private final String password; public SnmpDeviceSimulatorV2(int port, String password) throws IOException { - super(new File("conf.agent"), new File("bootCounter.agent"), new RequestProcessor(event -> { - System.out.println("aboba"); - ((Snmp) event.getSource()).cancel(event.getPDU(), event1 -> System.out.println("canceled")); - })); + super(new File("conf.agent"), new File("bootCounter.agent"), new CommandProcessor(new OctetString("12312"))); CommunityTarget target = new CommunityTarget(); target.setCommunity(new OctetString(password)); this.address = GenericAddress.parse("udp:0.0.0.0/" + port); diff --git a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java index bf371058aa..55ed8fb97f 100644 --- a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java +++ b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java @@ -16,6 +16,7 @@ package org.thingsboard.server.transport.snmp; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Scanner; @@ -24,24 +25,12 @@ public class SnmpTestV2 { SnmpDeviceSimulatorV2 device = new SnmpDeviceSimulatorV2(1610, "public"); device.start(); - device.setUpMappings(Map.of( - ".1.3.6.1.2.1.1.1.50", "12", - ".1.3.6.1.2.1.2.1.52", "56", - ".1.3.6.1.2.1.3.1.54", "yes", - ".1.3.6.1.2.1.7.1.58", "" - )); - - -// while (true) { -// new Scanner(System.in).nextLine(); -// device.sendTrap("127.0.0.1", 1062, Map.of(".1.3.6.1.2.87.1.56", "12")); -// System.out.println("sent"); -// } - -// Snmp snmp = new Snmp(device.transportMappings[0]); -// device.snmp.addCommandResponder(event -> { -// System.out.println(event); -// }); + Map mappings = new HashMap<>(); + for (int i = 1; i <= 500; i++) { + String oid = String.format(".1.3.6.1.2.1.%s.1.52", i); + mappings.put(oid, "value_" + i); + } + device.setUpMappings(mappings); new Scanner(System.in).nextLine(); } diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 9f086bcbc5..8fcd4cc2d2 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -93,6 +93,7 @@ transport: parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}" # to configure SNMP to work over UDP or TCP underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}" + max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"