diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpDeviceProfileKvMapping.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpMapping.java similarity index 90% rename from common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpDeviceProfileKvMapping.java rename to common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpMapping.java index 6eff7cbd34..6bd0ca6b4d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpDeviceProfileKvMapping.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpMapping.java @@ -18,11 +18,10 @@ package org.thingsboard.server.common.data.device.profile; import lombok.Data; import org.thingsboard.server.common.data.kv.DataType; -//TODO: rename class @Data -public class SnmpDeviceProfileKvMapping { - private String key; - private DataType type; - private String method; +public class SnmpMapping { private String oid; + private String method; + private String key; + private DataType dataType; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpProfileTransportConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpProfileTransportConfiguration.java index 50f6c4351c..fb6b46fbb6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpProfileTransportConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/device/profile/SnmpProfileTransportConfiguration.java @@ -29,8 +29,8 @@ public class SnmpProfileTransportConfiguration implements DeviceProfileTransport private int pollPeriodMs; private int timeoutMs; private int retries; - private List attributes; - private List telemetry; + private List attributesMappings; + private List telemetryMappings; @Override public DeviceTransportType getType() { @@ -38,9 +38,9 @@ public class SnmpProfileTransportConfiguration implements DeviceProfileTransport } @JsonIgnore - public List getKvMappings() { - if (attributes != null && telemetry != null) { - return Stream.concat(attributes.stream(), telemetry.stream()).collect(Collectors.toList()); + public List getAllMappings() { + if (attributesMappings != null && telemetryMappings != null) { + return Stream.concat(attributesMappings.stream(), telemetryMappings.stream()).collect(Collectors.toList()); } else { return Collections.emptyList(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 3ba360cd8e..6988bef9e1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -20,8 +20,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.TbTransportService; @@ -31,6 +29,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; +import org.thingsboard.server.queue.util.AfterContextReady; import javax.annotation.PostConstruct; import java.net.InetAddress; @@ -109,7 +108,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { serviceInfo = builder.build(); } - @EventListener(ContextRefreshedEvent.class) + @AfterContextReady public void setTransports() { serviceInfo = ServiceInfo.newBuilder(serviceInfo) .addAllTransports(getTransportServices().stream() diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterContextReady.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterContextReady.java new file mode 100644 index 0000000000..5c68a7609a --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterContextReady.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.springframework.context.event.ContextRefreshedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.annotation.AliasFor; +import org.springframework.core.annotation.Order; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@EventListener(ContextRefreshedEvent.class) +@Order +public @interface AfterContextReady { + @AliasFor(annotation = Order.class, attribute = "value") + int order() default Integer.MAX_VALUE; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java new file mode 100644 index 0000000000..5d2e94ee68 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.annotation.AliasFor; +import org.springframework.core.annotation.Order; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@EventListener(ApplicationReadyEvent.class) +@Order +public @interface AfterStartUp { + @AliasFor(annotation = Order.class, attribute = "value") + int order() default Integer.MAX_VALUE; +} diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java index a9fa8380fa..df2582005a 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java @@ -20,16 +20,14 @@ import lombok.extern.slf4j.Slf4j; import org.snmp4j.PDU; import org.snmp4j.smi.OID; import org.snmp4j.smi.VariableBinding; -import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; -import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.device.data.DeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; -import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileKvMapping; +import org.thingsboard.server.common.data.device.profile.SnmpMapping; import org.thingsboard.server.common.data.device.profile.SnmpProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -45,6 +43,7 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbSnmpTransportComponent; import org.thingsboard.server.transport.snmp.service.ProtoTransportEntityService; import org.thingsboard.server.transport.snmp.service.SnmpTransportBalancingService; @@ -61,7 +60,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @TbSnmpTransportComponent @@ -80,8 +78,7 @@ public class SnmpTransportContext extends TransportContext { private final Map> profilesPdus = new ConcurrentHashMap<>(); private Collection allSnmpDevicesIds = new ConcurrentLinkedDeque<>(); - @EventListener(ApplicationReadyEvent.class) - @Order(2) + @AfterStartUp(order = 2) public void initDevicesSessions() { log.info("Initializing SNMP devices sessions"); allSnmpDevicesIds = protoEntityService.getAllSnmpDevicesIds().stream() @@ -209,17 +206,17 @@ public class SnmpTransportContext extends TransportContext { } private List createPdus(SnmpProfileTransportConfiguration deviceProfileConfig) { - Map> varBindingPerMethod = new HashMap<>(); + Map> bindingsPerMethod = new HashMap<>(); - deviceProfileConfig.getKvMappings().forEach(mapping -> varBindingPerMethod + deviceProfileConfig.getAllMappings().forEach(mapping -> bindingsPerMethod .computeIfAbsent(mapping.getMethod(), v -> new ArrayList<>()) .add(new VariableBinding(new OID(mapping.getOid())))); - return varBindingPerMethod.keySet().stream() + return bindingsPerMethod.keySet().stream() .map(method -> { PDU request = new PDU(); request.setType(getSnmpMethod(method)); - request.addAll(varBindingPerMethod.get(method)); + request.addAll(bindingsPerMethod.get(method)); return request; }) .collect(Collectors.toList()); @@ -300,31 +297,27 @@ public class SnmpTransportContext extends TransportContext { return profilesPdus; } - public Optional getAttributesMapping(DeviceProfileId deviceProfileId, OID responseOid) { + public Optional getAttributeMapping(DeviceProfileId deviceProfileId, OID responseOid) { if (profilesTransportConfigs.containsKey(deviceProfileId)) { - return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getAttributes()); + return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getAttributesMappings()); } return Optional.empty(); } - public Optional getTelemetryMapping(DeviceProfileId deviceProfileId, OID responseOid) { + public Optional getTelemetryMapping(DeviceProfileId deviceProfileId, OID responseOid) { if (profilesTransportConfigs.containsKey(deviceProfileId)) { - return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getTelemetry()); + return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getTelemetryMappings()); } return Optional.empty(); } - private Optional getMapping(OID responseOid, List mappings) { + private Optional getMapping(OID responseOid, List mappings) { return mappings.stream() .filter(kvMapping -> new OID(kvMapping.getOid()).equals(responseOid)) //TODO: OID shouldn't be duplicated in the config, add backend and UI verification .findFirst(); } - public ExecutorService getSnmpCallbackExecutor() { - return snmpTransportService.getSnmpCallbackExecutor(); - } - private int getSnmpMethod(String configMethod) { switch (configMethod) { case "get": diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 22e60c98ee..3e9eb63943 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -15,114 +15,74 @@ */ package org.thingsboard.server.transport.snmp.service; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import lombok.AllArgsConstructor; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.snmp4j.PDU; import org.snmp4j.Snmp; import org.snmp4j.event.ResponseEvent; import org.snmp4j.smi.VariableBinding; import org.snmp4j.transport.DefaultUdpTransportMapping; -import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Lazy; -import org.springframework.context.event.EventListener; -import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.TbTransportService; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.kv.DataType; -import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.auth.SessionInfoCreator; -import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.transport.snmp.SnmpTransportContext; +import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbSnmpTransportComponent; +import org.thingsboard.server.transport.snmp.SnmpTransportContext; import org.thingsboard.server.transport.snmp.session.DeviceSessionContext; +import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.IOException; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; @TbSnmpTransportComponent @Service @Slf4j public class SnmpTransportService implements TbTransportService { private final SnmpTransportContext snmpTransportContext; + private final TransportService transportService; - @Getter - private ExecutorService snmpCallbackExecutor; - @Getter private Snmp snmp; private ScheduledExecutorService pollingExecutor; + private ExecutorService snmpResponseProcessingExecutor; - public SnmpTransportService(@Lazy SnmpTransportContext snmpTransportContext) { + public SnmpTransportService(@Lazy SnmpTransportContext snmpTransportContext, + TransportService transportService) { this.snmpTransportContext = snmpTransportContext; + this.transportService = transportService; } - // @PostConstruct - private void init() { - log.info("Starting SNMP transport..."); + @PostConstruct + private void init() throws IOException { + log.info("Initializing SNMP transport service"); + pollingExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("snmp-polling")); //TODO: Set parallelism value in the config - snmpCallbackExecutor = Executors.newWorkStealingPool(20); + snmpResponseProcessingExecutor = Executors.newWorkStealingPool(20); initializeSnmp(); - log.info("SNMP transport started!"); + log.info("SNMP transport service initialized"); } - @PreDestroy - public void shutdown() { - log.info("Stopping SNMP transport!"); - if (pollingExecutor != null) { - pollingExecutor.shutdownNow(); - } - if (snmpCallbackExecutor != null) { - snmpCallbackExecutor.shutdownNow(); - } - if (snmp != null) { - try { - snmp.close(); - } catch (IOException e) { - log.error(e.getMessage(), e); - } - } - log.info("SNMP transport stopped!"); + private void initializeSnmp() throws IOException { + snmp = new Snmp(new DefaultUdpTransportMapping()); + snmp.listen(); } - @EventListener(ApplicationReadyEvent.class) - @Order(value = 10) - public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { - log.info("Received application ready event. Starting SNMP polling."); -// startPolling(); - } - - private void initializeSnmp() { - try { - this.snmp = new Snmp(new DefaultUdpTransportMapping()); - this.snmp.listen(); - } catch (IOException e) { - //TODO: what should be done if transport wasn't initialized? - log.error(e.getMessage(), e); - } - } - - private void startPolling() { + @AfterStartUp(order = 10) + public void startPolling() { + log.info("Starting SNMP polling"); //TODO: Get poll period from configuration; int pollPeriodSeconds = 1; @@ -141,7 +101,7 @@ public class SnmpTransportService implements TbTransportService { DeviceProfileId deviceProfileId = sessionContext.getDeviceProfile().getId(); snmpTransportContext.getProfilesPdus().get(deviceProfileId).forEach(pdu -> { try { - log.debug("[{}] Sending SNMP message...", pdu.getRequestID()); + log.debug("[{}] Sending SNMP message for device {}", pdu.getRequestID(), sessionContext.getDeviceId()); snmp.send(pdu, sessionContext.getTarget(), deviceProfileId, sessionContext); } catch (IOException e) { log.error(e.getMessage(), e); @@ -152,118 +112,84 @@ public class SnmpTransportService implements TbTransportService { public void onNewDeviceResponse(ResponseEvent responseEvent, DeviceSessionContext sessionContext) { ((Snmp) responseEvent.getSource()).cancel(responseEvent.getRequest(), sessionContext); - snmpTransportContext.getSnmpCallbackExecutor().submit(() -> processSnmpResponse(responseEvent, sessionContext)); + snmpResponseProcessingExecutor.submit(() -> processSnmpResponse(responseEvent, sessionContext)); } private void processSnmpResponse(ResponseEvent event, DeviceSessionContext sessionContext) { - PDU response = event.getResponse(); if (event.getError() != null) { log.warn("Response error: {}", event.getError().getMessage(), event.getError()); + return; } - if (response != null) { - log.debug("[{}] Processing SNMP response: {}", response.getRequestID(), response); - - DeviceProfileId deviceProfileId = (DeviceProfileId) event.getUserObject(); - TransportService transportService = snmpTransportContext.getTransportService(); - for (int i = 0; i < response.size(); i++) { - VariableBinding vb = response.get(i); - snmpTransportContext.getAttributesMapping(deviceProfileId, vb.getOid()).ifPresent(kvMapping -> transportService.process(DeviceTransportType.DEFAULT, - TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(sessionContext.getToken()).build(), - new DeviceAuthCallback(snmpTransportContext, sessionInfo -> { - try { - transportService.process(sessionInfo, - convertToPostAttributes(kvMapping.getKey(), kvMapping.getType(), vb.toValueString()), - TransportServiceCallback.EMPTY); - reportActivity(sessionInfo); - } catch (Exception e) { - log.warn("Failed to process SNMP response: {}", e.getMessage(), e); - } - }))); - snmpTransportContext.getTelemetryMapping(deviceProfileId, vb.getOid()).ifPresent(kvMapping -> transportService.process(DeviceTransportType.DEFAULT, - TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(sessionContext.getToken()).build(), - new DeviceAuthCallback(snmpTransportContext, sessionInfo -> { - try { - transportService.process(sessionInfo, - convertToPostTelemetry(kvMapping.getKey(), kvMapping.getType(), vb.toValueString()), - TransportServiceCallback.EMPTY); - reportActivity(sessionInfo); - - } catch (Exception e) { - log.warn("Failed to process SNMP response: {}", e.getMessage(), e); - } - }))); - } - } else { + PDU response = event.getResponse(); + if (response == null) { log.warn("No SNMP response, requestId: {}", event.getRequest().getRequestID()); + return; } + + DeviceProfileId deviceProfileId = (DeviceProfileId) event.getUserObject(); + log.debug("[{}] Processing SNMP response for device {} with device profile {}: {}", + response.getRequestID(), sessionContext.getDeviceId(), deviceProfileId, response); + + JsonObject telemetry = new JsonObject(); + JsonObject attributes = new JsonObject(); + + for (int i = 0; i < response.size(); i++) { + VariableBinding variableBinding = response.get(i); + log.trace("Processing variable binding {}: {}", i, variableBinding); + + snmpTransportContext.getTelemetryMapping(deviceProfileId, variableBinding.getOid()).ifPresent(mapping -> { + log.trace("Found telemetry mapping for oid {}: {}", variableBinding.getOid(), mapping); + processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), telemetry); + }); + + snmpTransportContext.getAttributeMapping(deviceProfileId, variableBinding.getOid()).ifPresent(mapping -> { + log.trace("Found attribute mapping for oid {}: {}", variableBinding.getOid(), mapping); + processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), attributes); + }); + } + + if (telemetry.entrySet().isEmpty() && attributes.entrySet().isEmpty()) { + log.warn("No telemetry or attribute values is the SNMP response for device {}", sessionContext.getDeviceId()); + return; + } + + if (!telemetry.entrySet().isEmpty()) { + TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(telemetry); + transportService.process(sessionContext.getSessionInfo(), postTelemetryMsg, TransportServiceCallback.EMPTY); + log.debug("Posted telemetry for device {}: {}", sessionContext.getDeviceId(), telemetry); + } + + if (!attributes.entrySet().isEmpty()) { + TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(attributes); + transportService.process(sessionContext.getSessionInfo(), postAttributesMsg, TransportServiceCallback.EMPTY); + log.debug("Posted attributes for device {}: {}", sessionContext.getDeviceId(), attributes); + } + + reportActivity(sessionContext.getSessionInfo()); } private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { - snmpTransportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() + transportService.process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() .setAttributeSubscription(false) .setRpcSubscription(false) .setLastActivityTime(System.currentTimeMillis()) .build(), TransportServiceCallback.EMPTY); } - private TransportProtos.PostAttributeMsg convertToPostAttributes(String keyName, DataType dataType, String payload) throws AdaptorException { - try { - return JsonConverter.convertToAttributesProto(getKvJson(keyName, dataType, payload)); - } catch (IllegalStateException | JsonSyntaxException ex) { - //TODO: change the exception type - throw new AdaptorException(ex); - } - } - - private TransportProtos.PostTelemetryMsg convertToPostTelemetry(String keyName, DataType dataType, String payload) throws AdaptorException { - try { - return JsonConverter.convertToTelemetryProto(getKvJson(keyName, dataType, payload)); - } catch (IllegalStateException | JsonSyntaxException ex) { - //TODO: change the exception type - throw new AdaptorException(ex); - } - } - - private JsonElement getKvJson(String keyName, DataType dataType, String payload) throws AdaptorException { - JsonObject result = new JsonObject(); + private void processValue(String key, DataType dataType, String value, JsonObject result) { switch (dataType) { case LONG: - result.addProperty(keyName, Long.parseLong(payload)); + result.addProperty(key, Long.parseLong(value)); break; case BOOLEAN: - result.addProperty(keyName, Boolean.parseBoolean(payload)); + result.addProperty(key, Boolean.parseBoolean(value)); break; case DOUBLE: - result.addProperty(keyName, Double.parseDouble(payload)); - break; - case STRING: - result.addProperty(keyName, payload); + result.addProperty(key, Double.parseDouble(value)); break; default: - //TODO: change the exception type - throw new AdaptorException("Unsupported data type"); - } - return new JsonParser().parse(result.toString()); - } - - @AllArgsConstructor - private static class DeviceAuthCallback implements TransportServiceCallback { - private final TransportContext transportContext; - private final Consumer onSuccess; - - @Override - public void onSuccess(ValidateDeviceCredentialsResponse msg) { - if (msg.hasDeviceInfo()) { - onSuccess.accept(SessionInfoCreator.create(msg, transportContext, UUID.randomUUID())); - } else { - log.warn("Failed to process device auth"); - } - } - - @Override - public void onError(Throwable e) { - log.warn("Failed to process device auth", e); + result.addProperty(key, value); } } @@ -271,4 +197,23 @@ public class SnmpTransportService implements TbTransportService { public String getName() { return "SNMP"; } + + @PreDestroy + public void shutdown() { + log.info("Stopping SNMP transport!"); + if (pollingExecutor != null) { + pollingExecutor.shutdownNow(); + } + if (snmpResponseProcessingExecutor != null) { + snmpResponseProcessingExecutor.shutdownNow(); + } + if (snmp != null) { + try { + snmp.close(); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + log.info("SNMP transport stopped!"); + } }