This commit is contained in:
Viacheslav Klimov 2021-03-24 12:05:43 +02:00 committed by Andrew Shvayka
parent 8f2438d6ab
commit 039508bfbd
7 changed files with 185 additions and 179 deletions

View File

@ -18,11 +18,10 @@ package org.thingsboard.server.common.data.device.profile;
import lombok.Data;
import org.thingsboard.server.common.data.kv.DataType;
//TODO: rename class
@Data
public class SnmpDeviceProfileKvMapping {
private String key;
private DataType type;
private String method;
public class SnmpMapping {
private String oid;
private String method;
private String key;
private DataType dataType;
}

View File

@ -29,8 +29,8 @@ public class SnmpProfileTransportConfiguration implements DeviceProfileTransport
private int pollPeriodMs;
private int timeoutMs;
private int retries;
private List<SnmpDeviceProfileKvMapping> attributes;
private List<SnmpDeviceProfileKvMapping> telemetry;
private List<SnmpMapping> attributesMappings;
private List<SnmpMapping> telemetryMappings;
@Override
public DeviceTransportType getType() {
@ -38,9 +38,9 @@ public class SnmpProfileTransportConfiguration implements DeviceProfileTransport
}
@JsonIgnore
public List<SnmpDeviceProfileKvMapping> getKvMappings() {
if (attributes != null && telemetry != null) {
return Stream.concat(attributes.stream(), telemetry.stream()).collect(Collectors.toList());
public List<SnmpMapping> getAllMappings() {
if (attributesMappings != null && telemetryMappings != null) {
return Stream.concat(attributesMappings.stream(), telemetryMappings.stream()).collect(Collectors.toList());
} else {
return Collections.emptyList();
}

View File

@ -20,8 +20,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.TbTransportService;
@ -31,6 +29,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.util.AfterContextReady;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
@ -109,7 +108,7 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
serviceInfo = builder.build();
}
@EventListener(ContextRefreshedEvent.class)
@AfterContextReady
public void setTransports() {
serviceInfo = ServiceInfo.newBuilder(serviceInfo)
.addAllTransports(getTransportServices().stream()

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.util;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.AliasFor;
import org.springframework.core.annotation.Order;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@EventListener(ContextRefreshedEvent.class)
@Order
public @interface AfterContextReady {
@AliasFor(annotation = Order.class, attribute = "value")
int order() default Integer.MAX_VALUE;
}

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.util;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.AliasFor;
import org.springframework.core.annotation.Order;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@EventListener(ApplicationReadyEvent.class)
@Order
public @interface AfterStartUp {
@AliasFor(annotation = Order.class, attribute = "value")
int order() default Integer.MAX_VALUE;
}

View File

@ -20,16 +20,14 @@ import lombok.extern.slf4j.Slf4j;
import org.snmp4j.PDU;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.VariableBinding;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.device.data.DeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileKvMapping;
import org.thingsboard.server.common.data.device.profile.SnmpMapping;
import org.thingsboard.server.common.data.device.profile.SnmpProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
@ -45,6 +43,7 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.service.ProtoTransportEntityService;
import org.thingsboard.server.transport.snmp.service.SnmpTransportBalancingService;
@ -61,7 +60,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@TbSnmpTransportComponent
@ -80,8 +78,7 @@ public class SnmpTransportContext extends TransportContext {
private final Map<DeviceProfileId, List<PDU>> profilesPdus = new ConcurrentHashMap<>();
private Collection<DeviceId> allSnmpDevicesIds = new ConcurrentLinkedDeque<>();
@EventListener(ApplicationReadyEvent.class)
@Order(2)
@AfterStartUp(order = 2)
public void initDevicesSessions() {
log.info("Initializing SNMP devices sessions");
allSnmpDevicesIds = protoEntityService.getAllSnmpDevicesIds().stream()
@ -209,17 +206,17 @@ public class SnmpTransportContext extends TransportContext {
}
private List<PDU> createPdus(SnmpProfileTransportConfiguration deviceProfileConfig) {
Map<String, List<VariableBinding>> varBindingPerMethod = new HashMap<>();
Map<String, List<VariableBinding>> bindingsPerMethod = new HashMap<>();
deviceProfileConfig.getKvMappings().forEach(mapping -> varBindingPerMethod
deviceProfileConfig.getAllMappings().forEach(mapping -> bindingsPerMethod
.computeIfAbsent(mapping.getMethod(), v -> new ArrayList<>())
.add(new VariableBinding(new OID(mapping.getOid()))));
return varBindingPerMethod.keySet().stream()
return bindingsPerMethod.keySet().stream()
.map(method -> {
PDU request = new PDU();
request.setType(getSnmpMethod(method));
request.addAll(varBindingPerMethod.get(method));
request.addAll(bindingsPerMethod.get(method));
return request;
})
.collect(Collectors.toList());
@ -300,31 +297,27 @@ public class SnmpTransportContext extends TransportContext {
return profilesPdus;
}
public Optional<SnmpDeviceProfileKvMapping> getAttributesMapping(DeviceProfileId deviceProfileId, OID responseOid) {
public Optional<SnmpMapping> getAttributeMapping(DeviceProfileId deviceProfileId, OID responseOid) {
if (profilesTransportConfigs.containsKey(deviceProfileId)) {
return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getAttributes());
return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getAttributesMappings());
}
return Optional.empty();
}
public Optional<SnmpDeviceProfileKvMapping> getTelemetryMapping(DeviceProfileId deviceProfileId, OID responseOid) {
public Optional<SnmpMapping> getTelemetryMapping(DeviceProfileId deviceProfileId, OID responseOid) {
if (profilesTransportConfigs.containsKey(deviceProfileId)) {
return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getTelemetry());
return getMapping(responseOid, profilesTransportConfigs.get(deviceProfileId).getTelemetryMappings());
}
return Optional.empty();
}
private Optional<SnmpDeviceProfileKvMapping> getMapping(OID responseOid, List<SnmpDeviceProfileKvMapping> mappings) {
private Optional<SnmpMapping> getMapping(OID responseOid, List<SnmpMapping> mappings) {
return mappings.stream()
.filter(kvMapping -> new OID(kvMapping.getOid()).equals(responseOid))
//TODO: OID shouldn't be duplicated in the config, add backend and UI verification
.findFirst();
}
public ExecutorService getSnmpCallbackExecutor() {
return snmpTransportService.getSnmpCallbackExecutor();
}
private int getSnmpMethod(String configMethod) {
switch (configMethod) {
case "get":

View File

@ -15,114 +15,74 @@
*/
package org.thingsboard.server.transport.snmp.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.snmp4j.PDU;
import org.snmp4j.Snmp;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.smi.VariableBinding;
import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.SessionInfoCreator;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.snmp.SnmpTransportContext;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.SnmpTransportContext;
import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@TbSnmpTransportComponent
@Service
@Slf4j
public class SnmpTransportService implements TbTransportService {
private final SnmpTransportContext snmpTransportContext;
private final TransportService transportService;
@Getter
private ExecutorService snmpCallbackExecutor;
@Getter
private Snmp snmp;
private ScheduledExecutorService pollingExecutor;
private ExecutorService snmpResponseProcessingExecutor;
public SnmpTransportService(@Lazy SnmpTransportContext snmpTransportContext) {
public SnmpTransportService(@Lazy SnmpTransportContext snmpTransportContext,
TransportService transportService) {
this.snmpTransportContext = snmpTransportContext;
this.transportService = transportService;
}
// @PostConstruct
private void init() {
log.info("Starting SNMP transport...");
@PostConstruct
private void init() throws IOException {
log.info("Initializing SNMP transport service");
pollingExecutor = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("snmp-polling"));
//TODO: Set parallelism value in the config
snmpCallbackExecutor = Executors.newWorkStealingPool(20);
snmpResponseProcessingExecutor = Executors.newWorkStealingPool(20);
initializeSnmp();
log.info("SNMP transport started!");
log.info("SNMP transport service initialized");
}
@PreDestroy
public void shutdown() {
log.info("Stopping SNMP transport!");
if (pollingExecutor != null) {
pollingExecutor.shutdownNow();
}
if (snmpCallbackExecutor != null) {
snmpCallbackExecutor.shutdownNow();
}
if (snmp != null) {
try {
snmp.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
log.info("SNMP transport stopped!");
private void initializeSnmp() throws IOException {
snmp = new Snmp(new DefaultUdpTransportMapping());
snmp.listen();
}
@EventListener(ApplicationReadyEvent.class)
@Order(value = 10)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
log.info("Received application ready event. Starting SNMP polling.");
// startPolling();
}
private void initializeSnmp() {
try {
this.snmp = new Snmp(new DefaultUdpTransportMapping());
this.snmp.listen();
} catch (IOException e) {
//TODO: what should be done if transport wasn't initialized?
log.error(e.getMessage(), e);
}
}
private void startPolling() {
@AfterStartUp(order = 10)
public void startPolling() {
log.info("Starting SNMP polling");
//TODO: Get poll period from configuration;
int pollPeriodSeconds = 1;
@ -141,7 +101,7 @@ public class SnmpTransportService implements TbTransportService {
DeviceProfileId deviceProfileId = sessionContext.getDeviceProfile().getId();
snmpTransportContext.getProfilesPdus().get(deviceProfileId).forEach(pdu -> {
try {
log.debug("[{}] Sending SNMP message...", pdu.getRequestID());
log.debug("[{}] Sending SNMP message for device {}", pdu.getRequestID(), sessionContext.getDeviceId());
snmp.send(pdu, sessionContext.getTarget(), deviceProfileId, sessionContext);
} catch (IOException e) {
log.error(e.getMessage(), e);
@ -152,118 +112,84 @@ public class SnmpTransportService implements TbTransportService {
public void onNewDeviceResponse(ResponseEvent responseEvent, DeviceSessionContext sessionContext) {
((Snmp) responseEvent.getSource()).cancel(responseEvent.getRequest(), sessionContext);
snmpTransportContext.getSnmpCallbackExecutor().submit(() -> processSnmpResponse(responseEvent, sessionContext));
snmpResponseProcessingExecutor.submit(() -> processSnmpResponse(responseEvent, sessionContext));
}
private void processSnmpResponse(ResponseEvent event, DeviceSessionContext sessionContext) {
PDU response = event.getResponse();
if (event.getError() != null) {
log.warn("Response error: {}", event.getError().getMessage(), event.getError());
return;
}
if (response != null) {
log.debug("[{}] Processing SNMP response: {}", response.getRequestID(), response);
DeviceProfileId deviceProfileId = (DeviceProfileId) event.getUserObject();
TransportService transportService = snmpTransportContext.getTransportService();
for (int i = 0; i < response.size(); i++) {
VariableBinding vb = response.get(i);
snmpTransportContext.getAttributesMapping(deviceProfileId, vb.getOid()).ifPresent(kvMapping -> transportService.process(DeviceTransportType.DEFAULT,
TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(sessionContext.getToken()).build(),
new DeviceAuthCallback(snmpTransportContext, sessionInfo -> {
try {
transportService.process(sessionInfo,
convertToPostAttributes(kvMapping.getKey(), kvMapping.getType(), vb.toValueString()),
TransportServiceCallback.EMPTY);
reportActivity(sessionInfo);
} catch (Exception e) {
log.warn("Failed to process SNMP response: {}", e.getMessage(), e);
}
})));
snmpTransportContext.getTelemetryMapping(deviceProfileId, vb.getOid()).ifPresent(kvMapping -> transportService.process(DeviceTransportType.DEFAULT,
TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(sessionContext.getToken()).build(),
new DeviceAuthCallback(snmpTransportContext, sessionInfo -> {
try {
transportService.process(sessionInfo,
convertToPostTelemetry(kvMapping.getKey(), kvMapping.getType(), vb.toValueString()),
TransportServiceCallback.EMPTY);
reportActivity(sessionInfo);
} catch (Exception e) {
log.warn("Failed to process SNMP response: {}", e.getMessage(), e);
}
})));
}
} else {
PDU response = event.getResponse();
if (response == null) {
log.warn("No SNMP response, requestId: {}", event.getRequest().getRequestID());
return;
}
DeviceProfileId deviceProfileId = (DeviceProfileId) event.getUserObject();
log.debug("[{}] Processing SNMP response for device {} with device profile {}: {}",
response.getRequestID(), sessionContext.getDeviceId(), deviceProfileId, response);
JsonObject telemetry = new JsonObject();
JsonObject attributes = new JsonObject();
for (int i = 0; i < response.size(); i++) {
VariableBinding variableBinding = response.get(i);
log.trace("Processing variable binding {}: {}", i, variableBinding);
snmpTransportContext.getTelemetryMapping(deviceProfileId, variableBinding.getOid()).ifPresent(mapping -> {
log.trace("Found telemetry mapping for oid {}: {}", variableBinding.getOid(), mapping);
processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), telemetry);
});
snmpTransportContext.getAttributeMapping(deviceProfileId, variableBinding.getOid()).ifPresent(mapping -> {
log.trace("Found attribute mapping for oid {}: {}", variableBinding.getOid(), mapping);
processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), attributes);
});
}
if (telemetry.entrySet().isEmpty() && attributes.entrySet().isEmpty()) {
log.warn("No telemetry or attribute values is the SNMP response for device {}", sessionContext.getDeviceId());
return;
}
if (!telemetry.entrySet().isEmpty()) {
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(telemetry);
transportService.process(sessionContext.getSessionInfo(), postTelemetryMsg, TransportServiceCallback.EMPTY);
log.debug("Posted telemetry for device {}: {}", sessionContext.getDeviceId(), telemetry);
}
if (!attributes.entrySet().isEmpty()) {
TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(attributes);
transportService.process(sessionContext.getSessionInfo(), postAttributesMsg, TransportServiceCallback.EMPTY);
log.debug("Posted attributes for device {}: {}", sessionContext.getDeviceId(), attributes);
}
reportActivity(sessionContext.getSessionInfo());
}
private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
snmpTransportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
transportService.process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(false)
.setRpcSubscription(false)
.setLastActivityTime(System.currentTimeMillis())
.build(), TransportServiceCallback.EMPTY);
}
private TransportProtos.PostAttributeMsg convertToPostAttributes(String keyName, DataType dataType, String payload) throws AdaptorException {
try {
return JsonConverter.convertToAttributesProto(getKvJson(keyName, dataType, payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
//TODO: change the exception type
throw new AdaptorException(ex);
}
}
private TransportProtos.PostTelemetryMsg convertToPostTelemetry(String keyName, DataType dataType, String payload) throws AdaptorException {
try {
return JsonConverter.convertToTelemetryProto(getKvJson(keyName, dataType, payload));
} catch (IllegalStateException | JsonSyntaxException ex) {
//TODO: change the exception type
throw new AdaptorException(ex);
}
}
private JsonElement getKvJson(String keyName, DataType dataType, String payload) throws AdaptorException {
JsonObject result = new JsonObject();
private void processValue(String key, DataType dataType, String value, JsonObject result) {
switch (dataType) {
case LONG:
result.addProperty(keyName, Long.parseLong(payload));
result.addProperty(key, Long.parseLong(value));
break;
case BOOLEAN:
result.addProperty(keyName, Boolean.parseBoolean(payload));
result.addProperty(key, Boolean.parseBoolean(value));
break;
case DOUBLE:
result.addProperty(keyName, Double.parseDouble(payload));
break;
case STRING:
result.addProperty(keyName, payload);
result.addProperty(key, Double.parseDouble(value));
break;
default:
//TODO: change the exception type
throw new AdaptorException("Unsupported data type");
}
return new JsonParser().parse(result.toString());
}
@AllArgsConstructor
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponse> {
private final TransportContext transportContext;
private final Consumer<TransportProtos.SessionInfoProto> onSuccess;
@Override
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
if (msg.hasDeviceInfo()) {
onSuccess.accept(SessionInfoCreator.create(msg, transportContext, UUID.randomUUID()));
} else {
log.warn("Failed to process device auth");
}
}
@Override
public void onError(Throwable e) {
log.warn("Failed to process device auth", e);
result.addProperty(key, value);
}
}
@ -271,4 +197,23 @@ public class SnmpTransportService implements TbTransportService {
public String getName() {
return "SNMP";
}
@PreDestroy
public void shutdown() {
log.info("Stopping SNMP transport!");
if (pollingExecutor != null) {
pollingExecutor.shutdownNow();
}
if (snmpResponseProcessingExecutor != null) {
snmpResponseProcessingExecutor.shutdownNow();
}
if (snmp != null) {
try {
snmp.close();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
log.info("SNMP transport stopped!");
}
}