Merge pull request #10137 from thingsboard/feature/snmp-request-delay

SNMP: delay between sending request chunks; traps processing fixes
This commit is contained in:
Andrew Shvayka 2024-02-08 13:24:01 +02:00 committed by GitHub
commit f668ce9661
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 518 additions and 158 deletions

View File

@ -1173,14 +1173,18 @@ transport:
bind_port: "${SNMP_BIND_PORT:1620}"
response_processing:
# parallelism level for executor (workStealingPool) that is responsible for handling responses from SNMP devices
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:4}"
# to configure SNMP to work over UDP or TCP
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
# Batch size to request OID mappings from the device (useful when the device profile has multiple hundreds of OID mappings)
# Maximum size of a PDU (amount of OID mappings in a single SNMP request). The request will be split into multiple PDUs if mappings amount exceeds this number
max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}"
# Delay after sending each request chunk (in case the request was split into multiple PDUs due to max_request_oids)
request_chunk_delay_ms: "${SNMP_REQUEST_CHUNK_DELAY_MS:100}"
response:
# To ignore SNMP response values that do not match the data type of the configured OID mapping (by default false - will throw an error if any value of the response not match configured data types)
ignore_type_cast_errors: "${SNMP_RESPONSE_IGNORE_TYPE_CAST_ERRORS:false}"
# Thread pool size for scheduler that executes device querying tasks
scheduler_thread_pool_size: "${SNMP_SCHEDULER_THREAD_POOL_SIZE:4}"
stats:
# Enable/Disable the collection of transport statistics
enabled: "${TB_TRANSPORT_STATS_ENABLED:true}"

View File

@ -152,12 +152,14 @@ public class SnmpTransportContext extends TransportContext {
try {
if (!newProfileTransportConfiguration.equals(sessionContext.getProfileTransportConfiguration())) {
sessionContext.setProfileTransportConfiguration(newProfileTransportConfiguration);
sessionContext.setDevice(device);
sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
snmpTransportService.cancelQueryingTasks(sessionContext);
snmpTransportService.createQueryingTasks(sessionContext);
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, true, null);
} else if (!newDeviceTransportConfiguration.equals(sessionContext.getDeviceTransportConfiguration())) {
sessionContext.setDeviceTransportConfiguration(newDeviceTransportConfiguration);
sessionContext.setDevice(device);
sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, true, null);
} else {

View File

@ -15,6 +15,11 @@
*/
package org.thingsboard.server.transport.snmp.service;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import lombok.Builder;
@ -32,7 +37,7 @@ import org.snmp4j.mp.MPv3;
import org.snmp4j.security.SecurityModels;
import org.snmp4j.security.SecurityProtocols;
import org.snmp4j.security.USM;
import org.snmp4j.smi.Address;
import org.snmp4j.smi.IpAddress;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.TcpAddress;
import org.snmp4j.smi.UdpAddress;
@ -44,6 +49,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.common.data.kv.DataType;
@ -53,11 +59,11 @@ import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.SnmpTransportContext;
import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
import org.thingsboard.server.transport.snmp.session.ScheduledTask;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -71,8 +77,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -80,6 +84,7 @@ import java.util.stream.Collectors;
@Service
@Slf4j
@RequiredArgsConstructor
@SuppressWarnings("UnstableApiUsage")
public class SnmpTransportService implements TbTransportService, CommandResponder {
private final TransportService transportService;
private final PduService pduService;
@ -88,23 +93,27 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
@Getter
private Snmp snmp;
private ScheduledExecutorService queryingExecutor;
private ExecutorService responseProcessingExecutor;
private ListeningScheduledExecutorService scheduler;
private ExecutorService executor;
private final Map<SnmpCommunicationSpec, ResponseDataMapper> responseDataMappers = new EnumMap<>(SnmpCommunicationSpec.class);
private final Map<SnmpCommunicationSpec, ResponseProcessor> responseProcessors = new EnumMap<>(SnmpCommunicationSpec.class);
@Value("${transport.snmp.bind_port:1620}")
private Integer snmpBindPort;
@Value("${transport.snmp.response_processing.parallelism_level}")
private Integer responseProcessingParallelismLevel;
@Value("${transport.snmp.response_processing.parallelism_level:4}")
private int responseProcessingThreadPoolSize;
@Value("${transport.snmp.scheduler_thread_pool_size:4}")
private int schedulerThreadPoolSize;
@Value("${transport.snmp.underlying_protocol}")
private String snmpUnderlyingProtocol;
@Value("${transport.snmp.request_chunk_delay_ms:100}")
private int requestChunkDelayMs;
@PostConstruct
private void init() throws IOException {
queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying"));
responseProcessingExecutor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingParallelismLevel, "snmp-response-processing");
scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(schedulerThreadPoolSize, ThingsBoardThreadFactory.forName("snmp-querying")));
executor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingThreadPoolSize, "snmp-response-processing");
initializeSnmp();
configureResponseDataMappers();
@ -115,11 +124,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
@PreDestroy
public void stop() {
if (queryingExecutor != null) {
queryingExecutor.shutdownNow();
if (scheduler != null) {
scheduler.shutdownNow();
}
if (responseProcessingExecutor != null) {
responseProcessingExecutor.shutdownNow();
if (executor != null) {
executor.shutdownNow();
}
}
@ -144,38 +153,39 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
}
public void createQueryingTasks(DeviceSessionContext sessionContext) {
List<ScheduledFuture<?>> queryingTasks = sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream()
sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream()
.filter(communicationConfig -> communicationConfig instanceof RepeatingQueryingSnmpCommunicationConfig)
.map(config -> {
.forEach(config -> {
RepeatingQueryingSnmpCommunicationConfig repeatingCommunicationConfig = (RepeatingQueryingSnmpCommunicationConfig) config;
Long queryingFrequency = repeatingCommunicationConfig.getQueryingFrequencyMs();
return queryingExecutor.scheduleWithFixedDelay(() -> {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.init(() -> {
try {
if (sessionContext.isActive()) {
sendRequest(sessionContext, repeatingCommunicationConfig);
return sendRequest(sessionContext, repeatingCommunicationConfig);
}
} catch (Exception e) {
log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), config.getSpec().getLabel(), e);
}
}, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS);
})
.collect(Collectors.toList());
sessionContext.getQueryingTasks().addAll(queryingTasks);
return Futures.immediateVoidFuture();
}, queryingFrequency, scheduler);
sessionContext.getQueryingTasks().add(scheduledTask);
});
}
public void cancelQueryingTasks(DeviceSessionContext sessionContext) {
sessionContext.getQueryingTasks().forEach(task -> task.cancel(true));
sessionContext.getQueryingTasks().forEach(ScheduledTask::cancel);
sessionContext.getQueryingTasks().clear();
}
private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) {
sendRequest(sessionContext, communicationConfig, Collections.emptyMap());
private ListenableFuture<Void> sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) {
return sendRequest(sessionContext, communicationConfig, Collections.emptyMap());
}
private void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
private ListenableFuture<Void> sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
List<PDU> request = pduService.createPdus(sessionContext, communicationConfig, values);
RequestContext requestContext = RequestContext.builder()
.communicationSpec(communicationConfig.getSpec())
@ -183,18 +193,39 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
.responseMappings(communicationConfig.getAllMappings())
.requestSize(request.size())
.build();
sendRequest(sessionContext, request, requestContext);
return sendRequest(sessionContext, request, requestContext);
}
private void sendRequest(DeviceSessionContext sessionContext, List<PDU> request, RequestContext requestContext) {
for (PDU pdu : request) {
log.debug("Executing SNMP request for device {} with {} variable bindings", sessionContext.getDeviceId(), pdu.size());
try {
snmp.send(pdu, sessionContext.getTarget(), requestContext, sessionContext);
} catch (IOException e) {
log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e);
private ListenableFuture<Void> sendRequest(DeviceSessionContext sessionContext, List<PDU> request, RequestContext requestContext) {
if (request.size() <= 1 || requestChunkDelayMs == 0) {
for (PDU pdu : request) {
sendPdu(pdu, requestContext, sessionContext);
}
return Futures.immediateVoidFuture();
}
List<ListenableFuture<?>> futures = new ArrayList<>();
for (int i = 0, delay = 0; i < request.size(); i++, delay += requestChunkDelayMs) {
PDU pdu = request.get(i);
if (delay == 0) {
sendPdu(pdu, requestContext, sessionContext);
} else {
ListenableScheduledFuture<?> future = scheduler.schedule(() -> {
sendPdu(pdu, requestContext, sessionContext);
}, delay, TimeUnit.MILLISECONDS);
futures.add(future);
}
}
return Futures.whenAllComplete(futures).call(() -> null, MoreExecutors.directExecutor());
}
private void sendPdu(PDU pdu, RequestContext requestContext, DeviceSessionContext sessionContext) {
log.debug("[{}] Sending SNMP request with {} variable bindings to {}", sessionContext.getDeviceId(), pdu.size(), sessionContext.getTarget().getAddress());
try {
snmp.send(pdu, sessionContext.getTarget(), requestContext, sessionContext);
} catch (Exception e) {
log.error("[{}] Failed to send SNMP request", sessionContext.getDeviceId(), e);
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e);
}
}
@ -251,21 +282,19 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext);
RequestContext requestContext = (RequestContext) event.getUserObject();
if (event.getError() != null) {
log.warn("SNMP response error: {}", event.getError().toString());
log.warn("[{}] SNMP response error: {}", sessionContext.getDeviceId(), event.getError().toString());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException(event.getError()));
return;
}
PDU responsePdu = event.getResponse();
if (log.isTraceEnabled()) {
log.trace("Received PDU for device {}: {}", sessionContext.getDeviceId(), responsePdu);
}
log.trace("[{}] Received PDU: {}", sessionContext.getDeviceId(), responsePdu);
List<PDU> response;
if (requestContext.getRequestSize() == 1) {
if (responsePdu == null) {
log.debug("No response from SNMP device {}, requestId: {}", sessionContext.getDeviceId(), event.getRequest().getRequestID());
if (requestContext.getMethod() == SnmpMethod.GET) {
log.debug("[{}][{}] Empty response from device", sessionContext.getDeviceId(), event.getRequest().getRequestID());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException("No response from device"));
}
return;
@ -281,14 +310,14 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
response.add(responsePart);
}
}
log.debug("All response parts are collected for request to device {}", sessionContext.getDeviceId());
log.debug("[{}] All {} response parts are collected for request", sessionContext.getDeviceId(), responseParts.size());
} else {
log.trace("Awaiting other response parts for request to device {}", sessionContext.getDeviceId());
log.trace("[{}] Awaiting other response parts for request", sessionContext.getDeviceId());
return;
}
}
responseProcessingExecutor.execute(() -> {
executor.execute(() -> {
try {
processResponse(sessionContext, response, requestContext);
} catch (Exception e) {
@ -298,24 +327,31 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
}
/*
* SNMP notifications handler
*
* TODO: add check for host uniqueness when saving device (for backward compatibility - only for the ones using from-device RPC requests)
*
* NOTE: SNMP TRAPs support won't work properly when there is more than one SNMP transport,
* due to load-balancing of requests from devices: session might not be on this instance
* */
* SNMP notifications handler
*
* TODO: add check for host uniqueness when saving device (for backward compatibility - only for the ones using from-device RPC requests)
*
* NOTE: SNMP TRAPs support won't work properly when there is more than one SNMP transport,
* due to load-balancing of requests from devices: session might not be on this instance
* */
@Override
public void processPdu(CommandResponderEvent event) {
Address sourceAddress = event.getPeerAddress();
DeviceSessionContext sessionContext = transportContext.getSessions().stream()
.filter(session -> session.getTarget().getAddress().equals(sourceAddress))
.findFirst().orElse(null);
if (sessionContext == null) {
log.warn("SNMP TRAP processing failed: couldn't find device session for address {}", sourceAddress);
IpAddress sourceAddress = (IpAddress) event.getPeerAddress();
List<DeviceSessionContext> sessions = transportContext.getSessions().stream()
.filter(session -> ((IpAddress) session.getTarget().getAddress()).getInetAddress().equals(sourceAddress.getInetAddress()))
.collect(Collectors.toList());
if (sessions.isEmpty()) {
log.warn("Couldn't find device session for SNMP TRAP for address {}", sourceAddress);
return;
} else if (sessions.size() > 1) {
for (DeviceSessionContext sessionContext : sessions) {
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), SnmpCommunicationSpec.TO_SERVER_RPC_REQUEST.getLabel(),
new IllegalStateException("Found multiple devices for host " + sourceAddress.getInetAddress().getHostAddress()));
}
return;
}
DeviceSessionContext sessionContext = sessions.get(0);
try {
processIncomingTrap(sessionContext, event);
} catch (Throwable e) {
@ -327,11 +363,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
private void processIncomingTrap(DeviceSessionContext sessionContext, CommandResponderEvent event) {
PDU pdu = event.getPDU();
if (pdu == null) {
log.warn("Got empty trap from device {}", sessionContext.getDeviceId());
log.warn("[{}] Received empty SNMP trap", sessionContext.getDeviceId());
throw new IllegalArgumentException("Received TRAP with no data");
}
log.debug("Processing SNMP trap from device {} (PDU: {}}", sessionContext.getDeviceId(), pdu);
log.debug("[{}] Processing SNMP trap: {}", sessionContext.getDeviceId(), pdu);
SnmpCommunicationConfig communicationConfig = sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream()
.filter(config -> config.getSpec() == SnmpCommunicationSpec.TO_SERVER_RPC_REQUEST).findFirst()
.orElseThrow(() -> new IllegalArgumentException("No config found for to-server RPC requests"));
@ -341,7 +377,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
.method(SnmpMethod.TRAP)
.build();
responseProcessingExecutor.execute(() -> {
executor.execute(() -> {
processResponse(sessionContext, List.of(pdu), requestContext);
});
}
@ -352,7 +388,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
JsonObject responseData = responseDataMappers.get(requestContext.getCommunicationSpec()).map(response, requestContext);
if (responseData.size() == 0) {
log.warn("No values in the SNMP response for device {}", sessionContext.getDeviceId());
log.warn("[{}] No values in the response", sessionContext.getDeviceId());
throw new IllegalArgumentException("No values in the response");
}
@ -428,11 +464,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
@PreDestroy
public void shutdown() {
log.info("Stopping SNMP transport!");
if (queryingExecutor != null) {
queryingExecutor.shutdownNow();
if (scheduler != null) {
scheduler.shutdownNow();
}
if (responseProcessingExecutor != null) {
responseProcessingExecutor.shutdownNow();
if (executor != null) {
executor.shutdownNow();
}
if (snmp != null) {
try {

View File

@ -45,7 +45,6 @@ import org.thingsboard.server.transport.snmp.SnmpTransportContext;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ -60,7 +59,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Setter
private SnmpDeviceTransportConfiguration deviceTransportConfiguration;
@Getter
private final Device device;
@Setter
private Device device;
@Getter
private final TenantId tenantId;
@ -73,7 +73,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
private Runnable sessionTimeoutHandler;
@Getter
private final List<ScheduledFuture<?>> queryingTasks = new LinkedList<>();
private final List<ScheduledTask> queryingTasks = new LinkedList<>();
@Builder
public DeviceSessionContext(TenantId tenantId, Device device, DeviceProfile deviceProfile, String token,

View File

@ -0,0 +1,62 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.snmp.session;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Data
@Slf4j
public class ScheduledTask {
private ListenableFuture<?> scheduledFuture;
private boolean stopped = false;
public void init(AsyncCallable<Void> task, long delayMs, ScheduledExecutorService scheduler) {
schedule(task, delayMs, scheduler);
}
private void schedule(AsyncCallable<Void> task, long delayMs, ScheduledExecutorService scheduler) {
scheduledFuture = Futures.scheduleAsync(() -> {
if (stopped) {
return Futures.immediateCancelledFuture();
}
try {
return task.call();
} catch (Throwable t) {
log.error("Unhandled error in scheduled task", t);
return Futures.immediateFailedFuture(t);
}
}, delayMs, TimeUnit.MILLISECONDS, scheduler);
if (!stopped) {
scheduledFuture.addListener(() -> schedule(task, delayMs, scheduler), MoreExecutors.directExecutor());
}
}
public void cancel() {
stopped = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
}

View File

@ -58,11 +58,10 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent {
private final Target target;
private final Address address;
private final Map<String, String> mappings;
private Snmp snmp;
private final String password;
public SnmpDeviceSimulatorV2(int port, String password) throws IOException {
public SnmpDeviceSimulatorV2(int port, String password, Map<String, String> mappings) throws IOException {
super(new File("conf.agent"), new File("bootCounter.agent"), new CommandProcessor(new OctetString("12312")));
CommunityTarget target = new CommunityTarget();
target.setCommunity(new OctetString(password));
@ -72,7 +71,7 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent {
target.setTimeout(1500);
target.setVersion(SnmpConstants.version2c);
this.target = target;
this.password = password;
this.mappings = mappings;
}
public void start() throws IOException {
@ -85,13 +84,6 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent {
snmp = new Snmp(transportMappings[0]);
}
public void setUpMappings(Map<String, String> oidToResponseMappings) {
unregisterManagedObject(getSnmpv2MIB());
oidToResponseMappings.forEach((oid, response) -> {
registerManagedObject(new MOScalar<>(new OID(oid), MOAccessImpl.ACCESS_READ_WRITE, new OctetString(response)));
});
}
public void sendTrap(String host, int port, Map<String, String> values) throws IOException {
PDU pdu = new PDU();
pdu.addAll(values.entrySet().stream()
@ -107,6 +99,10 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent {
@Override
protected void registerManagedObjects() {
unregisterManagedObject(getSnmpv2MIB());
mappings.forEach((oid, response) -> {
registerManagedObject(new MOScalar<>(new OID(oid), MOAccessImpl.ACCESS_READ_WRITE, new OctetString(response)));
});
}
protected void registerManagedObject(ManagedObject mo) {
@ -152,6 +148,7 @@ public class SnmpDeviceSimulatorV2 extends BaseAgent {
}
protected void unregisterManagedObjects() {
unregisterManagedObject(getSnmpv2MIB());
}
protected void addCommunities(SnmpCommunityMIB communityMIB) {

View File

@ -15,28 +15,34 @@
*/
package org.thingsboard.server.transport.snmp;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.thingsboard.common.util.JacksonUtil;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.stream.Collectors;
public class SnmpTestV2 {
private static final Scanner scanner = new Scanner(System.in);
public static void main(String[] args) throws IOException {
SnmpDeviceSimulatorV2 client = new SnmpDeviceSimulatorV2(1610, "public");
Map<String, String> mappings = new LinkedHashMap<>();
for (int i = 1; i <= 50; i++) {
String oid = String.format("1.3.6.1.2.1.%s.1.52", i);
mappings.put(oid, "value_" + i);
}
client.start();
Map<String, String> mappings = new HashMap<>();
// for (int i = 1; i <= 500; i++) {
// String oid = String.format(".1.3.6.1.2.1.%s.1.52", i);
// mappings.put(oid, "value_" + i);
// }
mappings.put("1.3.6.1.2.1.266.1.52", "****");
SnmpDeviceSimulatorV2 device = new SnmpDeviceSimulatorV2(1610, "public", mappings);
device.start();
client.setUpMappings(mappings);
inputTraps(client);
System.out.println("Hosting the following values:\n" + mappings.entrySet().stream()
.map(entry -> entry.getKey() + " - " + entry.getValue())
.collect(Collectors.joining("\n")));
scanner.nextLine();
}
@ -53,4 +59,18 @@ public class SnmpTestV2 {
}
}
private static void updateDeviceProfile(String file) throws Exception {
File profileFile = new File(file);
JsonNode deviceProfile = JacksonUtil.OBJECT_MAPPER.readTree(profileFile);
ArrayNode mappingsJson = (ArrayNode) deviceProfile.at("/profileData/transportConfiguration/communicationConfigs/0/mappings");
for (int i = 1; i <= 50; i++) {
String oid = String.format(".1.3.6.1.2.1.%s.1.52", i);
mappingsJson.add(JacksonUtil.newObjectNode()
.put("oid", oid)
.put("key", "key_" + i)
.put("dataType", "STRING"));
}
JacksonUtil.OBJECT_MAPPER.writeValue(profileFile, deviceProfile);
}
}

View File

@ -1,43 +0,0 @@
{
"timeoutMs": 500,
"retries": 0,
"communicationConfigs": [
{
"spec": "TELEMETRY_QUERYING",
"queryingFrequencyMs": 3000,
"mappings": [
{
"oid": ".1.3.6.1.2.1.1.1.50",
"key": "temperature",
"dataType": "LONG"
},
{
"oid": ".1.3.6.1.2.1.2.1.52",
"key": "humidity",
"dataType": "DOUBLE"
}
]
},
{
"spec": "CLIENT_ATTRIBUTES_QUERYING",
"queryingFrequencyMs": 5000,
"mappings": [
{
"oid": ".1.3.6.1.2.1.3.1.54",
"key": "isCool",
"dataType": "STRING"
}
]
},
{
"spec": "SHARED_ATTRIBUTES_SETTING",
"mappings": [
{
"oid": ".1.3.6.1.2.1.7.1.58",
"key": "shared",
"dataType": "STRING"
}
]
}
]
}

View File

@ -1,13 +0,0 @@
{
"address": "192.168.3.23",
"port": 1610,
"protocolVersion": "V3",
"username": "tb-user",
"engineId": "qwertyuioa",
"securityName": "tb-user",
"authenticationProtocol": "SHA_512",
"authenticationPassphrase": "sdfghjkloifgh",
"privacyProtocol": "DES",
"privacyPassphrase": "rtytguijokod"
}

View File

@ -1,6 +0,0 @@
{
"address": "127.0.0.1",
"port": 1610,
"community": "public",
"protocolVersion": "V2C"
}

View File

@ -0,0 +1,289 @@
{
"name": "SNMP Device Profile",
"description": "",
"image": null,
"type": "DEFAULT",
"transportType": "SNMP",
"provisionType": "DISABLED",
"defaultRuleChainId": null,
"defaultDashboardId": null,
"defaultQueueName": null,
"profileData": {
"configuration": {
"type": "DEFAULT"
},
"transportConfiguration": {
"type": "SNMP",
"timeoutMs": 500,
"retries": 0,
"communicationConfigs": [
{
"spec": "TELEMETRY_QUERYING",
"mappings": [
{
"oid": ".1.3.6.1.2.1.1.1.52",
"key": "key_1",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.2.1.52",
"key": "key_2",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.3.1.52",
"key": "key_3",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.4.1.52",
"key": "key_4",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.5.1.52",
"key": "key_5",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.6.1.52",
"key": "key_6",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.7.1.52",
"key": "key_7",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.8.1.52",
"key": "key_8",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.9.1.52",
"key": "key_9",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.10.1.52",
"key": "key_10",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.11.1.52",
"key": "key_11",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.12.1.52",
"key": "key_12",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.13.1.52",
"key": "key_13",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.14.1.52",
"key": "key_14",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.15.1.52",
"key": "key_15",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.16.1.52",
"key": "key_16",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.17.1.52",
"key": "key_17",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.18.1.52",
"key": "key_18",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.19.1.52",
"key": "key_19",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.20.1.52",
"key": "key_20",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.21.1.52",
"key": "key_21",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.22.1.52",
"key": "key_22",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.23.1.52",
"key": "key_23",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.24.1.52",
"key": "key_24",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.25.1.52",
"key": "key_25",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.26.1.52",
"key": "key_26",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.27.1.52",
"key": "key_27",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.28.1.52",
"key": "key_28",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.29.1.52",
"key": "key_29",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.30.1.52",
"key": "key_30",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.31.1.52",
"key": "key_31",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.32.1.52",
"key": "key_32",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.33.1.52",
"key": "key_33",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.34.1.52",
"key": "key_34",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.35.1.52",
"key": "key_35",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.36.1.52",
"key": "key_36",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.37.1.52",
"key": "key_37",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.38.1.52",
"key": "key_38",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.39.1.52",
"key": "key_39",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.40.1.52",
"key": "key_40",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.41.1.52",
"key": "key_41",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.42.1.52",
"key": "key_42",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.43.1.52",
"key": "key_43",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.44.1.52",
"key": "key_44",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.45.1.52",
"key": "key_45",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.46.1.52",
"key": "key_46",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.47.1.52",
"key": "key_47",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.48.1.52",
"key": "key_48",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.49.1.52",
"key": "key_49",
"dataType": "STRING"
},
{
"oid": ".1.3.6.1.2.1.50.1.52",
"key": "key_50",
"dataType": "STRING"
}
],
"queryingFrequencyMs": 5000
}
]
},
"provisionConfiguration": {
"type": "DISABLED",
"provisionDeviceSecret": null
},
"alarms": null
},
"provisionDeviceKey": null,
"firmwareId": null,
"softwareId": null,
"defaultEdgeRuleChainId": null,
"default": false
}

View File

@ -788,7 +788,11 @@ public class DefaultTransportService extends TransportActivityManager implements
.setSuccess(success)
.setError(error != null ? ExceptionUtils.getStackTrace(error) : ""))
.build();
sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);
try {
sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);
} catch (Exception e) {
log.error("[{}][{}] Failed to send lifecycle event to core", tenantId, deviceId, e);
}
}
@Override
@ -801,9 +805,13 @@ public class DefaultTransportService extends TransportActivityManager implements
.setEntityIdLSB(deviceId.getId().getLeastSignificantBits())
.setServiceId(serviceInfoProvider.getServiceId())
.setMethod(method)
.setError(ExceptionUtils.getStackTrace(error)))
.setError(ExceptionUtils.getRootCauseMessage(error)))
.build();
sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);
try {
sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);
} catch (Exception e) {
log.error("[{}][{}] Failed to send error event to core", tenantId, deviceId, e);
}
}
@Override

View File

@ -128,14 +128,18 @@ transport:
bind_port: "${SNMP_BIND_PORT:1620}"
response_processing:
# parallelism level for executor (workStealingPool) that is responsible for handling responses from SNMP devices
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:20}"
parallelism_level: "${SNMP_RESPONSE_PROCESSING_PARALLELISM_LEVEL:4}"
# to configure SNMP to work over UDP or TCP
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
# Batch size to request OID mappings from the device (useful when the device profile has multiple hundreds of OID mappings)
# Maximum size of a PDU (amount of OID mappings in a single SNMP request). The request will be split into multiple PDUs if mappings amount exceeds this number
max_request_oids: "${SNMP_MAX_REQUEST_OIDS:100}"
# Delay after sending each request chunk (in case the request was split into multiple PDUs due to max_request_oids)
request_chunk_delay_ms: "${SNMP_REQUEST_CHUNK_DELAY_MS:100}"
response:
# To ignore SNMP response values that do not match the data type of the configured OID mapping (by default false - will throw an error if any value of the response not match configured data types)
ignore_type_cast_errors: "${SNMP_RESPONSE_IGNORE_TYPE_CAST_ERRORS:false}"
# Thread pool size for scheduler that executes device querying tasks
scheduler_thread_pool_size: "${SNMP_SCHEDULER_THREAD_POOL_SIZE:4}"
sessions:
# Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device.
# The parameter value is in milliseconds.