From ff6074807ab3b25a571177c3f3c361b01ce188e5 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 6 Feb 2024 13:48:21 +0200 Subject: [PATCH 1/3] SNMP: request chunk sending delay; refactoring --- .../src/main/resources/thingsboard.yml | 7 +- .../transport/snmp/SnmpTransportContext.java | 2 + .../snmp/service/SnmpTransportService.java | 125 +++++--- .../snmp/session/DeviceSessionContext.java | 6 +- .../transport/snmp/session/ScheduledTask.java | 62 ++++ .../transport/snmp/SnmpDeviceSimulatorV2.java | 19 +- .../server/transport/snmp/SnmpTestV2.java | 42 ++- .../snmp-device-profile-transport-config.json | 43 --- .../snmp-device-transport-config-v3.json | 13 - .../snmp-device-transport-config.json | 6 - .../test/resources/snmp_device_profile.json | 289 ++++++++++++++++++ .../service/DefaultTransportService.java | 12 +- .../src/main/resources/tb-snmp-transport.yml | 7 +- 13 files changed, 492 insertions(+), 141 deletions(-) create mode 100644 common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/ScheduledTask.java delete mode 100644 common/transport/snmp/src/test/resources/snmp-device-profile-transport-config.json delete mode 100644 common/transport/snmp/src/test/resources/snmp-device-transport-config-v3.json delete mode 100644 common/transport/snmp/src/test/resources/snmp-device-transport-config.json create mode 100644 common/transport/snmp/src/test/resources/snmp_device_profile.json diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 936f30af60..6a20c3bcfc 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1173,14 +1173,17 @@ transport: bind_port: "${SNMP_BIND_PORT:1620}" response_processing: # parallelism level for executor (workStealingPool) that is responsible for handling responses from SNMP devices - parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}" + parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:4}" # to configure SNMP to work over UDP or TCP underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}" - # Batch size to request OID mappings from the device (useful when the device profile has multiple hundreds of OID mappings) + # Maximum size of a PDU (amount of OID mappings in a single SNMP request). The request will be split into multiple PDUs if mappings amount exceeds this number max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}" + # Delay after sending each request chunk (in case the request was split into multiple PDUs due to max_request_oids) + request_chunk_delay_ms: "${SNMP_REQUEST_CHUNK_DELAY_MS:100}" response: # To ignore SNMP response values that do not match the data type of the configured OID mapping (by default false - will throw an error if any value of the response not match configured data types) ignore_type_cast_errors: "${SNMP_RESPONSE_IGNORE_TYPE_CAST_ERRORS:false}" + scheduler_thread_pool_size: "${SNMP_SCHEDULER_THREAD_POOL_SIZE:4}" stats: # Enable/Disable the collection of transport statistics enabled: "${TB_TRANSPORT_STATS_ENABLED:true}" diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java index a59013bbfe..6e30b06a2c 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java @@ -152,12 +152,14 @@ public class SnmpTransportContext extends TransportContext { try { if (!newProfileTransportConfiguration.equals(sessionContext.getProfileTransportConfiguration())) { sessionContext.setProfileTransportConfiguration(newProfileTransportConfiguration); + sessionContext.setDevice(device); sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration); snmpTransportService.cancelQueryingTasks(sessionContext); snmpTransportService.createQueryingTasks(sessionContext); transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, true, null); } else if (!newDeviceTransportConfiguration.equals(sessionContext.getDeviceTransportConfiguration())) { sessionContext.setDeviceTransportConfiguration(newDeviceTransportConfiguration); + sessionContext.setDevice(device); sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration); transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, true, null); } else { diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 3a8811df97..26a1ef037e 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -15,6 +15,11 @@ */ package org.thingsboard.server.transport.snmp.service; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import lombok.Builder; @@ -44,6 +49,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.TbTransportService; import org.thingsboard.server.common.data.kv.DataType; @@ -53,11 +59,11 @@ import org.thingsboard.server.common.data.transport.snmp.SnmpMethod; import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig; import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbSnmpTransportComponent; import org.thingsboard.server.transport.snmp.SnmpTransportContext; import org.thingsboard.server.transport.snmp.session.DeviceSessionContext; +import org.thingsboard.server.transport.snmp.session.ScheduledTask; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -71,8 +77,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -80,6 +84,7 @@ import java.util.stream.Collectors; @Service @Slf4j @RequiredArgsConstructor +@SuppressWarnings("UnstableApiUsage") public class SnmpTransportService implements TbTransportService, CommandResponder { private final TransportService transportService; private final PduService pduService; @@ -88,23 +93,27 @@ public class SnmpTransportService implements TbTransportService, CommandResponde @Getter private Snmp snmp; - private ScheduledExecutorService queryingExecutor; - private ExecutorService responseProcessingExecutor; + private ListeningScheduledExecutorService scheduler; + private ExecutorService executor; private final Map responseDataMappers = new EnumMap<>(SnmpCommunicationSpec.class); private final Map responseProcessors = new EnumMap<>(SnmpCommunicationSpec.class); @Value("${transport.snmp.bind_port:1620}") private Integer snmpBindPort; - @Value("${transport.snmp.response_processing.parallelism_level}") - private Integer responseProcessingParallelismLevel; + @Value("${transport.snmp.response_processing.parallelism_level:4}") + private int responseProcessingThreadPoolSize; + @Value("${transport.snmp.scheduler_thread_pool_size:4}") + private int schedulerThreadPoolSize; @Value("${transport.snmp.underlying_protocol}") private String snmpUnderlyingProtocol; + @Value("${transport.snmp.request_chunk_delay_ms:100}") + private int requestChunkDelayMs; @PostConstruct private void init() throws IOException { - queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying")); - responseProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingParallelismLevel, "snmp-response-processing"); + scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(schedulerThreadPoolSize, ThingsBoardThreadFactory.forName("snmp-querying"))); + executor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingThreadPoolSize, "snmp-response-processing"); initializeSnmp(); configureResponseDataMappers(); @@ -115,11 +124,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde @PreDestroy public void stop() { - if (queryingExecutor != null) { - queryingExecutor.shutdownNow(); + if (scheduler != null) { + scheduler.shutdownNow(); } - if (responseProcessingExecutor != null) { - responseProcessingExecutor.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); } } @@ -144,38 +153,39 @@ public class SnmpTransportService implements TbTransportService, CommandResponde } public void createQueryingTasks(DeviceSessionContext sessionContext) { - List> queryingTasks = sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream() + sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream() .filter(communicationConfig -> communicationConfig instanceof RepeatingQueryingSnmpCommunicationConfig) - .map(config -> { + .forEach(config -> { RepeatingQueryingSnmpCommunicationConfig repeatingCommunicationConfig = (RepeatingQueryingSnmpCommunicationConfig) config; Long queryingFrequency = repeatingCommunicationConfig.getQueryingFrequencyMs(); - return queryingExecutor.scheduleWithFixedDelay(() -> { + ScheduledTask scheduledTask = new ScheduledTask(); + scheduledTask.init(() -> { try { if (sessionContext.isActive()) { - sendRequest(sessionContext, repeatingCommunicationConfig); + return sendRequest(sessionContext, repeatingCommunicationConfig); } } catch (Exception e) { log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString()); transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), config.getSpec().getLabel(), e); } - }, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS); - }) - .collect(Collectors.toList()); - sessionContext.getQueryingTasks().addAll(queryingTasks); + return Futures.immediateVoidFuture(); + }, queryingFrequency, scheduler); + sessionContext.getQueryingTasks().add(scheduledTask); + }); } public void cancelQueryingTasks(DeviceSessionContext sessionContext) { - sessionContext.getQueryingTasks().forEach(task -> task.cancel(true)); + sessionContext.getQueryingTasks().forEach(ScheduledTask::cancel); sessionContext.getQueryingTasks().clear(); } - private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) { - sendRequest(sessionContext, communicationConfig, Collections.emptyMap()); + private ListenableFuture sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) { + return sendRequest(sessionContext, communicationConfig, Collections.emptyMap()); } - private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map values) { + private ListenableFuture sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map values) { List request = pduService.createPdus(sessionContext, communicationConfig, values); RequestContext requestContext = RequestContext.builder() .communicationSpec(communicationConfig.getSpec()) @@ -183,18 +193,39 @@ public class SnmpTransportService implements TbTransportService, CommandResponde .responseMappings(communicationConfig.getAllMappings()) .requestSize(request.size()) .build(); - sendRequest(sessionContext, request, requestContext); + return sendRequest(sessionContext, request, requestContext); } - private void sendRequest(DeviceSessionContext sessionContext, List request, RequestContext requestContext) { - for (PDU pdu : request) { - log.debug("Executing SNMP request for device {} with {} variable bindings", sessionContext.getDeviceId(), pdu.size()); - try { - snmp.send(pdu, sessionContext.getTarget(), requestContext, sessionContext); - } catch (IOException e) { - log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString()); - transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e); + private ListenableFuture sendRequest(DeviceSessionContext sessionContext, List request, RequestContext requestContext) { + if (request.size() <= 1 || requestChunkDelayMs == 0) { + for (PDU pdu : request) { + sendPdu(pdu, requestContext, sessionContext); } + return Futures.immediateVoidFuture(); + } + + List> futures = new ArrayList<>(); + for (int i = 0, delay = 0; i < request.size(); i++, delay += requestChunkDelayMs) { + PDU pdu = request.get(i); + if (delay == 0) { + sendPdu(pdu, requestContext, sessionContext); + } else { + ListenableScheduledFuture future = scheduler.schedule(() -> { + sendPdu(pdu, requestContext, sessionContext); + }, delay, TimeUnit.MILLISECONDS); + futures.add(future); + } + } + return Futures.whenAllComplete(futures).call(() -> null, MoreExecutors.directExecutor()); + } + + private void sendPdu(PDU pdu, RequestContext requestContext, DeviceSessionContext sessionContext) { + log.debug("[{}] Sending SNMP request with {} variable bindings to {}", sessionContext.getDeviceId(), pdu.size(), sessionContext.getTarget().getAddress()); + try { + snmp.send(pdu, sessionContext.getTarget(), requestContext, sessionContext); + } catch (Exception e) { + log.error("[{}] Failed to send SNMP request", sessionContext.getDeviceId(), e); + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e); } } @@ -251,21 +282,19 @@ public class SnmpTransportService implements TbTransportService, CommandResponde ((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext); RequestContext requestContext = (RequestContext) event.getUserObject(); if (event.getError() != null) { - log.warn("SNMP response error: {}", event.getError().toString()); + log.warn("[{}] SNMP response error: {}", sessionContext.getDeviceId(), event.getError().toString()); transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException(event.getError())); return; } PDU responsePdu = event.getResponse(); - if (log.isTraceEnabled()) { - log.trace("Received PDU for device {}: {}", sessionContext.getDeviceId(), responsePdu); - } + log.trace("[{}] Received PDU: {}", sessionContext.getDeviceId(), responsePdu); List response; if (requestContext.getRequestSize() == 1) { if (responsePdu == null) { - log.debug("No response from SNMP device {}, requestId: {}", sessionContext.getDeviceId(), event.getRequest().getRequestID()); if (requestContext.getMethod() == SnmpMethod.GET) { + log.debug("[{}][{}] Empty response from device", sessionContext.getDeviceId(), event.getRequest().getRequestID()); transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException("No response from device")); } return; @@ -281,14 +310,14 @@ public class SnmpTransportService implements TbTransportService, CommandResponde response.add(responsePart); } } - log.debug("All response parts are collected for request to device {}", sessionContext.getDeviceId()); + log.debug("[{}] All {} response parts are collected for request", sessionContext.getDeviceId(), responseParts.size()); } else { - log.trace("Awaiting other response parts for request to device {}", sessionContext.getDeviceId()); + log.trace("[{}] Awaiting other response parts for request", sessionContext.getDeviceId()); return; } } - responseProcessingExecutor.execute(() -> { + executor.execute(() -> { try { processResponse(sessionContext, response, requestContext); } catch (Exception e) { @@ -341,7 +370,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde .method(SnmpMethod.TRAP) .build(); - responseProcessingExecutor.execute(() -> { + executor.execute(() -> { processResponse(sessionContext, List.of(pdu), requestContext); }); } @@ -352,7 +381,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde JsonObject responseData = responseDataMappers.get(requestContext.getCommunicationSpec()).map(response, requestContext); if (responseData.size() == 0) { - log.warn("No values in the SNMP response for device {}", sessionContext.getDeviceId()); + log.warn("[{}] No values in the response", sessionContext.getDeviceId()); throw new IllegalArgumentException("No values in the response"); } @@ -428,11 +457,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde @PreDestroy public void shutdown() { log.info("Stopping SNMP transport!"); - if (queryingExecutor != null) { - queryingExecutor.shutdownNow(); + if (scheduler != null) { + scheduler.shutdownNow(); } - if (responseProcessingExecutor != null) { - responseProcessingExecutor.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); } if (snmp != null) { try { diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java index 5819c76f74..8ec0c6841d 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java @@ -45,7 +45,6 @@ import org.thingsboard.server.transport.snmp.SnmpTransportContext; import java.util.LinkedList; import java.util.List; import java.util.UUID; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @@ -60,7 +59,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @Setter private SnmpDeviceTransportConfiguration deviceTransportConfiguration; @Getter - private final Device device; + @Setter + private Device device; @Getter private final TenantId tenantId; @@ -73,7 +73,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S private Runnable sessionTimeoutHandler; @Getter - private final List> queryingTasks = new LinkedList<>(); + private final List queryingTasks = new LinkedList<>(); @Builder public DeviceSessionContext(TenantId tenantId, Device device, DeviceProfile deviceProfile, String token, diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/ScheduledTask.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/ScheduledTask.java new file mode 100644 index 0000000000..43d83e5e2c --- /dev/null +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/ScheduledTask.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.snmp.session; + +import com.google.common.util.concurrent.AsyncCallable; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Data +@Slf4j +public class ScheduledTask { + private ListenableFuture scheduledFuture; + private boolean stopped = false; + + public void init(AsyncCallable task, long delayMs, ScheduledExecutorService scheduler) { + schedule(task, delayMs, scheduler); + } + + private void schedule(AsyncCallable task, long delayMs, ScheduledExecutorService scheduler) { + scheduledFuture = Futures.scheduleAsync(() -> { + if (stopped) { + return Futures.immediateCancelledFuture(); + } + try { + return task.call(); + } catch (Throwable t) { + log.error("Unhandled error in scheduled task", t); + return Futures.immediateFailedFuture(t); + } + }, delayMs, TimeUnit.MILLISECONDS, scheduler); + if (!stopped) { + scheduledFuture.addListener(() -> schedule(task, delayMs, scheduler), MoreExecutors.directExecutor()); + } + } + + public void cancel() { + stopped = true; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + +} diff --git a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java index 261e67bac8..ced8f21288 100644 --- a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java +++ b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpDeviceSimulatorV2.java @@ -58,11 +58,10 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent { private final Target target; private final Address address; + private final Map mappings; private Snmp snmp; - private final String password; - - public SnmpDeviceSimulatorV2(int port, String password) throws IOException { + public SnmpDeviceSimulatorV2(int port, String password, Map mappings) throws IOException { super(new File("conf.agent"), new File("bootCounter.agent"), new CommandProcessor(new OctetString("12312"))); CommunityTarget target = new CommunityTarget(); target.setCommunity(new OctetString(password)); @@ -72,7 +71,7 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent { target.setTimeout(1500); target.setVersion(SnmpConstants.version2c); this.target = target; - this.password = password; + this.mappings = mappings; } public void start() throws IOException { @@ -85,13 +84,6 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent { snmp = new Snmp(transportMappings[0]); } - public void setUpMappings(Map oidToResponseMappings) { - unregisterManagedObject(getSnmpv2MIB()); - oidToResponseMappings.forEach((oid, response) -> { - registerManagedObject(new MOScalar<>(new OID(oid), MOAccessImpl.ACCESS_READ_WRITE, new OctetString(response))); - }); - } - public void sendTrap(String host, int port, Map values) throws IOException { PDU pdu = new PDU(); pdu.addAll(values.entrySet().stream() @@ -107,6 +99,10 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent { @Override protected void registerManagedObjects() { + unregisterManagedObject(getSnmpv2MIB()); + mappings.forEach((oid, response) -> { + registerManagedObject(new MOScalar<>(new OID(oid), MOAccessImpl.ACCESS_READ_WRITE, new OctetString(response))); + }); } protected void registerManagedObject(ManagedObject mo) { @@ -152,6 +148,7 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent { } protected void unregisterManagedObjects() { + unregisterManagedObject(getSnmpv2MIB()); } protected void addCommunities(SnmpCommunityMIB communityMIB) { diff --git a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java index cb99c85655..c87507f962 100644 --- a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java +++ b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java @@ -15,28 +15,34 @@ */ package org.thingsboard.server.transport.snmp; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.thingsboard.common.util.JacksonUtil; + +import java.io.File; import java.io.IOException; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Scanner; +import java.util.stream.Collectors; public class SnmpTestV2 { private static final Scanner scanner = new Scanner(System.in); public static void main(String[] args) throws IOException { - SnmpDeviceSimulatorV2 client = new SnmpDeviceSimulatorV2(1610, "public"); + Map mappings = new LinkedHashMap<>(); + for (int i = 1; i <= 50; i++) { + String oid = String.format("1.3.6.1.2.1.%s.1.52", i); + mappings.put(oid, "value_" + i); + } - client.start(); - Map mappings = new HashMap<>(); -// for (int i = 1; i <= 500; i++) { -// String oid = String.format(".1.3.6.1.2.1.%s.1.52", i); -// mappings.put(oid, "value_" + i); -// } - mappings.put("1.3.6.1.2.1.266.1.52", "****"); + SnmpDeviceSimulatorV2 device = new SnmpDeviceSimulatorV2(1610, "public", mappings); + device.start(); - client.setUpMappings(mappings); - inputTraps(client); + System.out.println("Hosting the following values:\n" + mappings.entrySet().stream() + .map(entry -> entry.getKey() + " - " + entry.getValue()) + .collect(Collectors.joining("\n"))); scanner.nextLine(); } @@ -53,4 +59,18 @@ public class SnmpTestV2 { } } + private static void updateDeviceProfile(String file) throws Exception { + File profileFile = new File(file); + JsonNode deviceProfile = JacksonUtil.OBJECT_MAPPER.readTree(profileFile); + ArrayNode mappingsJson = (ArrayNode) deviceProfile.at("/profileData/transportConfiguration/communicationConfigs/0/mappings"); + for (int i = 1; i <= 50; i++) { + String oid = String.format(".1.3.6.1.2.1.%s.1.52", i); + mappingsJson.add(JacksonUtil.newObjectNode() + .put("oid", oid) + .put("key", "key_" + i) + .put("dataType", "STRING")); + } + JacksonUtil.OBJECT_MAPPER.writeValue(profileFile, deviceProfile); + } + } diff --git a/common/transport/snmp/src/test/resources/snmp-device-profile-transport-config.json b/common/transport/snmp/src/test/resources/snmp-device-profile-transport-config.json deleted file mode 100644 index f74ebca0bf..0000000000 --- a/common/transport/snmp/src/test/resources/snmp-device-profile-transport-config.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "timeoutMs": 500, - "retries": 0, - "communicationConfigs": [ - { - "spec": "TELEMETRY_QUERYING", - "queryingFrequencyMs": 3000, - "mappings": [ - { - "oid": ".1.3.6.1.2.1.1.1.50", - "key": "temperature", - "dataType": "LONG" - }, - { - "oid": ".1.3.6.1.2.1.2.1.52", - "key": "humidity", - "dataType": "DOUBLE" - } - ] - }, - { - "spec": "CLIENT_ATTRIBUTES_QUERYING", - "queryingFrequencyMs": 5000, - "mappings": [ - { - "oid": ".1.3.6.1.2.1.3.1.54", - "key": "isCool", - "dataType": "STRING" - } - ] - }, - { - "spec": "SHARED_ATTRIBUTES_SETTING", - "mappings": [ - { - "oid": ".1.3.6.1.2.1.7.1.58", - "key": "shared", - "dataType": "STRING" - } - ] - } - ] -} diff --git a/common/transport/snmp/src/test/resources/snmp-device-transport-config-v3.json b/common/transport/snmp/src/test/resources/snmp-device-transport-config-v3.json deleted file mode 100644 index 039e03fa53..0000000000 --- a/common/transport/snmp/src/test/resources/snmp-device-transport-config-v3.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "address": "192.168.3.23", - "port": 1610, - "protocolVersion": "V3", - - "username": "tb-user", - "engineId": "qwertyuioa", - "securityName": "tb-user", - "authenticationProtocol": "SHA_512", - "authenticationPassphrase": "sdfghjkloifgh", - "privacyProtocol": "DES", - "privacyPassphrase": "rtytguijokod" -} \ No newline at end of file diff --git a/common/transport/snmp/src/test/resources/snmp-device-transport-config.json b/common/transport/snmp/src/test/resources/snmp-device-transport-config.json deleted file mode 100644 index c73d817bfb..0000000000 --- a/common/transport/snmp/src/test/resources/snmp-device-transport-config.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "address": "127.0.0.1", - "port": 1610, - "community": "public", - "protocolVersion": "V2C" -} \ No newline at end of file diff --git a/common/transport/snmp/src/test/resources/snmp_device_profile.json b/common/transport/snmp/src/test/resources/snmp_device_profile.json new file mode 100644 index 0000000000..0e72c46ce8 --- /dev/null +++ b/common/transport/snmp/src/test/resources/snmp_device_profile.json @@ -0,0 +1,289 @@ +{ + "name": "SNMP Device Profile", + "description": "", + "image": null, + "type": "DEFAULT", + "transportType": "SNMP", + "provisionType": "DISABLED", + "defaultRuleChainId": null, + "defaultDashboardId": null, + "defaultQueueName": null, + "profileData": { + "configuration": { + "type": "DEFAULT" + }, + "transportConfiguration": { + "type": "SNMP", + "timeoutMs": 500, + "retries": 0, + "communicationConfigs": [ + { + "spec": "TELEMETRY_QUERYING", + "mappings": [ + { + "oid": ".1.3.6.1.2.1.1.1.52", + "key": "key_1", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.2.1.52", + "key": "key_2", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.3.1.52", + "key": "key_3", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.4.1.52", + "key": "key_4", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.5.1.52", + "key": "key_5", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.6.1.52", + "key": "key_6", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.7.1.52", + "key": "key_7", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.8.1.52", + "key": "key_8", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.9.1.52", + "key": "key_9", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.10.1.52", + "key": "key_10", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.11.1.52", + "key": "key_11", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.12.1.52", + "key": "key_12", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.13.1.52", + "key": "key_13", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.14.1.52", + "key": "key_14", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.15.1.52", + "key": "key_15", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.16.1.52", + "key": "key_16", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.17.1.52", + "key": "key_17", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.18.1.52", + "key": "key_18", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.19.1.52", + "key": "key_19", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.20.1.52", + "key": "key_20", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.21.1.52", + "key": "key_21", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.22.1.52", + "key": "key_22", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.23.1.52", + "key": "key_23", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.24.1.52", + "key": "key_24", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.25.1.52", + "key": "key_25", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.26.1.52", + "key": "key_26", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.27.1.52", + "key": "key_27", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.28.1.52", + "key": "key_28", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.29.1.52", + "key": "key_29", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.30.1.52", + "key": "key_30", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.31.1.52", + "key": "key_31", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.32.1.52", + "key": "key_32", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.33.1.52", + "key": "key_33", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.34.1.52", + "key": "key_34", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.35.1.52", + "key": "key_35", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.36.1.52", + "key": "key_36", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.37.1.52", + "key": "key_37", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.38.1.52", + "key": "key_38", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.39.1.52", + "key": "key_39", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.40.1.52", + "key": "key_40", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.41.1.52", + "key": "key_41", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.42.1.52", + "key": "key_42", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.43.1.52", + "key": "key_43", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.44.1.52", + "key": "key_44", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.45.1.52", + "key": "key_45", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.46.1.52", + "key": "key_46", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.47.1.52", + "key": "key_47", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.48.1.52", + "key": "key_48", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.49.1.52", + "key": "key_49", + "dataType": "STRING" + }, + { + "oid": ".1.3.6.1.2.1.50.1.52", + "key": "key_50", + "dataType": "STRING" + } + ], + "queryingFrequencyMs": 5000 + } + ] + }, + "provisionConfiguration": { + "type": "DISABLED", + "provisionDeviceSecret": null + }, + "alarms": null + }, + "provisionDeviceKey": null, + "firmwareId": null, + "softwareId": null, + "defaultEdgeRuleChainId": null, + "default": false +} \ No newline at end of file diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 0d7fbfc332..504b9bdf6d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -788,7 +788,11 @@ public class DefaultTransportService extends TransportActivityManager implements .setSuccess(success) .setError(error != null ? ExceptionUtils.getStackTrace(error) : "")) .build(); - sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY); + try { + sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY); + } catch (Exception e) { + log.error("[{}][{}] Failed to send lifecycle event to core", tenantId, deviceId, e); + } } @Override @@ -803,7 +807,11 @@ public class DefaultTransportService extends TransportActivityManager implements .setMethod(method) .setError(ExceptionUtils.getStackTrace(error))) .build(); - sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY); + try { + sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY); + } catch (Exception e) { + log.error("[{}][{}] Failed to send error event to core", tenantId, deviceId, e); + } } @Override diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 805ab5a8dd..2e69032cb3 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -128,14 +128,17 @@ transport: bind_port: "${SNMP_BIND_PORT:1620}" response_processing: # parallelism level for executor (workStealingPool) that is responsible for handling responses from SNMP devices - parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}" + parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:4}" # to configure SNMP to work over UDP or TCP underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}" - # Batch size to request OID mappings from the device (useful when the device profile has multiple hundreds of OID mappings) + # Maximum size of a PDU (amount of OID mappings in a single SNMP request). The request will be split into multiple PDUs if mappings amount exceeds this number max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}" + # Delay after sending each request chunk (in case the request was split into multiple PDUs due to max_request_oids) + request_chunk_delay_ms: "${SNMP_REQUEST_CHUNK_DELAY_MS:100}" response: # To ignore SNMP response values that do not match the data type of the configured OID mapping (by default false - will throw an error if any value of the response not match configured data types) ignore_type_cast_errors: "${SNMP_RESPONSE_IGNORE_TYPE_CAST_ERRORS:false}" + scheduler_thread_pool_size: "${SNMP_SCHEDULER_THREAD_POOL_SIZE:4}" sessions: # Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device. # The parameter value is in milliseconds. From 1c59cef7eb23892dbb47ceae9bcd98d2cfdba8b6 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 6 Feb 2024 13:59:17 +0200 Subject: [PATCH 2/3] Add scheduler_thread_pool_size param description for SNMP --- application/src/main/resources/thingsboard.yml | 1 + transport/snmp/src/main/resources/tb-snmp-transport.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6a20c3bcfc..535d6189c8 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1183,6 +1183,7 @@ transport: response: # To ignore SNMP response values that do not match the data type of the configured OID mapping (by default false - will throw an error if any value of the response not match configured data types) ignore_type_cast_errors: "${SNMP_RESPONSE_IGNORE_TYPE_CAST_ERRORS:false}" + # Thread pool size for scheduler that executes device querying tasks scheduler_thread_pool_size: "${SNMP_SCHEDULER_THREAD_POOL_SIZE:4}" stats: # Enable/Disable the collection of transport statistics diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 2e69032cb3..a40374e1f6 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -138,6 +138,7 @@ transport: response: # To ignore SNMP response values that do not match the data type of the configured OID mapping (by default false - will throw an error if any value of the response not match configured data types) ignore_type_cast_errors: "${SNMP_RESPONSE_IGNORE_TYPE_CAST_ERRORS:false}" + # Thread pool size for scheduler that executes device querying tasks scheduler_thread_pool_size: "${SNMP_SCHEDULER_THREAD_POOL_SIZE:4}" sessions: # Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device. From cd46ebe53ae7f83163020d7ac986f4b0baa359fa Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 6 Feb 2024 16:57:52 +0200 Subject: [PATCH 3/3] Fix SNMP traps processing --- .../snmp/service/SnmpTransportService.java | 39 +++++++++++-------- .../service/DefaultTransportService.java | 2 +- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 26a1ef037e..3dc293b70b 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -37,7 +37,7 @@ import org.snmp4j.mp.MPv3; import org.snmp4j.security.SecurityModels; import org.snmp4j.security.SecurityProtocols; import org.snmp4j.security.USM; -import org.snmp4j.smi.Address; +import org.snmp4j.smi.IpAddress; import org.snmp4j.smi.OctetString; import org.snmp4j.smi.TcpAddress; import org.snmp4j.smi.UdpAddress; @@ -327,24 +327,31 @@ public class SnmpTransportService implements TbTransportService, CommandResponde } /* - * SNMP notifications handler - * - * TODO: add check for host uniqueness when saving device (for backward compatibility - only for the ones using from-device RPC requests) - * - * NOTE: SNMP TRAPs support won't work properly when there is more than one SNMP transport, - * due to load-balancing of requests from devices: session might not be on this instance - * */ + * SNMP notifications handler + * + * TODO: add check for host uniqueness when saving device (for backward compatibility - only for the ones using from-device RPC requests) + * + * NOTE: SNMP TRAPs support won't work properly when there is more than one SNMP transport, + * due to load-balancing of requests from devices: session might not be on this instance + * */ @Override public void processPdu(CommandResponderEvent event) { - Address sourceAddress = event.getPeerAddress(); - DeviceSessionContext sessionContext = transportContext.getSessions().stream() - .filter(session -> session.getTarget().getAddress().equals(sourceAddress)) - .findFirst().orElse(null); - if (sessionContext == null) { - log.warn("SNMP TRAP processing failed: couldn't find device session for address {}", sourceAddress); + IpAddress sourceAddress = (IpAddress) event.getPeerAddress(); + List sessions = transportContext.getSessions().stream() + .filter(session -> ((IpAddress) session.getTarget().getAddress()).getInetAddress().equals(sourceAddress.getInetAddress())) + .collect(Collectors.toList()); + if (sessions.isEmpty()) { + log.warn("Couldn't find device session for SNMP TRAP for address {}", sourceAddress); + return; + } else if (sessions.size() > 1) { + for (DeviceSessionContext sessionContext : sessions) { + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), SnmpCommunicationSpec.TO_SERVER_RPC_REQUEST.getLabel(), + new IllegalStateException("Found multiple devices for host " + sourceAddress.getInetAddress().getHostAddress())); + } return; } + DeviceSessionContext sessionContext = sessions.get(0); try { processIncomingTrap(sessionContext, event); } catch (Throwable e) { @@ -356,11 +363,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde private void processIncomingTrap(DeviceSessionContext sessionContext, CommandResponderEvent event) { PDU pdu = event.getPDU(); if (pdu == null) { - log.warn("Got empty trap from device {}", sessionContext.getDeviceId()); + log.warn("[{}] Received empty SNMP trap", sessionContext.getDeviceId()); throw new IllegalArgumentException("Received TRAP with no data"); } - log.debug("Processing SNMP trap from device {} (PDU: {}}", sessionContext.getDeviceId(), pdu); + log.debug("[{}] Processing SNMP trap: {}", sessionContext.getDeviceId(), pdu); SnmpCommunicationConfig communicationConfig = sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream() .filter(config -> config.getSpec() == SnmpCommunicationSpec.TO_SERVER_RPC_REQUEST).findFirst() .orElseThrow(() -> new IllegalArgumentException("No config found for to-server RPC requests")); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 504b9bdf6d..14f7bfffff 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -805,7 +805,7 @@ public class DefaultTransportService extends TransportActivityManager implements .setEntityIdLSB(deviceId.getId().getLeastSignificantBits()) .setServiceId(serviceInfoProvider.getServiceId()) .setMethod(method) - .setError(ExceptionUtils.getStackTrace(error))) + .setError(ExceptionUtils.getRootCauseMessage(error))) .build(); try { sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);