This commit is contained in:
Viacheslav Klimov 2021-04-14 15:51:13 +03:00
parent 415bf570ba
commit 783b959577
16 changed files with 455 additions and 235 deletions

View File

@ -17,16 +17,11 @@ package org.thingsboard.server.common.data.device.profile;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import org.apache.commons.lang3.ArrayUtils;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import org.thingsboard.server.common.data.transport.snmp.configs.SnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Data
public class SnmpDeviceProfileTransportConfiguration implements DeviceProfileTransportConfiguration {
@ -51,7 +46,7 @@ public class SnmpDeviceProfileTransportConfiguration implements DeviceProfileTra
return timeoutMs != null && timeoutMs >= 0 && retries != null && retries >= 0
&& communicationConfigs != null && !communicationConfigs.isEmpty()
&& communicationConfigs.stream().allMatch(config -> config != null && config.isValid())
&& communicationConfigs.stream().flatMap(config -> config.getMappings().stream()).map(SnmpMapping::getOid)
.distinct().count() == communicationConfigs.stream().mapToInt(config -> config.getMappings().size()).sum();
&& communicationConfigs.stream().flatMap(config -> config.getAllMappings().stream()).map(SnmpMapping::getOid)
.distinct().count() == communicationConfigs.stream().mapToInt(config -> config.getAllMappings().size()).sum();
}
}

View File

@ -16,22 +16,11 @@
package org.thingsboard.server.common.data.transport.snmp;
public enum SnmpCommunicationSpec {
TELEMETRY_QUERYING(true),
CLIENT_ATTRIBUTES_QUERYING(true),
TELEMETRY_QUERYING,
SHARED_ATTRIBUTES_SETTING;
CLIENT_ATTRIBUTES_QUERYING,
SHARED_ATTRIBUTES_SETTING,
private final boolean isRepeatingQuerying;
SnmpCommunicationSpec() {
this.isRepeatingQuerying = false;
}
SnmpCommunicationSpec(boolean isRepeatingQuerying) {
this.isRepeatingQuerying = isRepeatingQuerying;
}
public boolean isRepeatingQuerying() {
return isRepeatingQuerying;
}
TO_DEVICE_RPC_COMMAND_SETTING,
TO_DEVICE_RPC_RESPONSE_QUERYING
}

View File

@ -16,13 +16,17 @@
package org.thingsboard.server.common.data.transport.snmp;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.kv.DataType;
import java.util.regex.Pattern;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SnmpMapping {
private String oid;
private String key;
@ -32,7 +36,6 @@ public class SnmpMapping {
@JsonIgnore
public boolean isValid() {
return StringUtils.isNotEmpty(oid) && OID_PATTERN.matcher(oid).matches() &&
StringUtils.isNotBlank(key) && dataType != null;
return StringUtils.isNotEmpty(oid) && OID_PATTERN.matcher(oid).matches() && StringUtils.isNotBlank(key);
}
}

View File

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.config;
import lombok.Data;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import java.util.List;
@Data
public abstract class MultipleMappingsSnmpCommunicationConfig implements SnmpCommunicationConfig {
protected List<SnmpMapping> mappings;
@Override
public boolean isValid() {
return mappings != null && !mappings.isEmpty() && mappings.stream().allMatch(mapping -> mapping != null && mapping.isValid());
}
@Override
public List<SnmpMapping> getAllMappings() {
return mappings;
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.configs;
package org.thingsboard.server.common.data.transport.snmp.config;
import lombok.Data;
import lombok.EqualsAndHashCode;
@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
@EqualsAndHashCode(callSuper = true)
@Data
public abstract class RepeatingQueryingSnmpCommunicationConfig extends SnmpCommunicationConfig {
public abstract class RepeatingQueryingSnmpCommunicationConfig extends MultipleMappingsSnmpCommunicationConfig {
private Long queryingFrequencyMs;
@Override
@ -31,6 +31,6 @@ public abstract class RepeatingQueryingSnmpCommunicationConfig extends SnmpCommu
@Override
public boolean isValid() {
return super.isValid() && queryingFrequencyMs != null && queryingFrequencyMs > 0;
return queryingFrequencyMs != null && queryingFrequencyMs > 0 && super.isValid();
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.configs;
package org.thingsboard.server.common.data.transport.snmp.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
import org.thingsboard.server.common.data.transport.snmp.config.impl.ClientAttributesQueryingSnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.impl.SharedAttributesSettingSnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.impl.TelemetryQueryingSnmpCommunicationConfig;
import java.util.List;
@ -33,22 +36,19 @@ import java.util.List;
@Type(value = ClientAttributesQueryingSnmpCommunicationConfig.class, name = "CLIENT_ATTRIBUTES_QUERYING"),
@Type(value = SharedAttributesSettingSnmpCommunicationConfig.class, name = "SHARED_ATTRIBUTES_SETTING")
})
public abstract class SnmpCommunicationConfig {
protected List<SnmpMapping> mappings;
public interface SnmpCommunicationConfig {
public List<SnmpMapping> getMappings() {
return mappings;
}
public abstract SnmpCommunicationSpec getSpec();
SnmpCommunicationSpec getSpec();
@JsonIgnore
public SnmpMethod getMethod() {
default SnmpMethod getMethod() {
return null;
}
@JsonIgnore
public boolean isValid() {
return mappings != null && !mappings.isEmpty() && mappings.stream().allMatch(mapping -> mapping != null && mapping.isValid());
}
List<SnmpMapping> getAllMappings();
@JsonIgnore
boolean isValid();
}

View File

@ -13,13 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.configs;
package org.thingsboard.server.common.data.transport.snmp.config.impl;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
public class ClientAttributesQueryingSnmpCommunicationConfig extends RepeatingQueryingSnmpCommunicationConfig {
@Override
public SnmpCommunicationSpec getSpec() {
return SnmpCommunicationSpec.CLIENT_ATTRIBUTES_QUERYING;
}
}

View File

@ -13,12 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.configs;
package org.thingsboard.server.common.data.transport.snmp.config.impl;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
import org.thingsboard.server.common.data.transport.snmp.config.MultipleMappingsSnmpCommunicationConfig;
public class SharedAttributesSettingSnmpCommunicationConfig extends MultipleMappingsSnmpCommunicationConfig {
public class SharedAttributesSettingSnmpCommunicationConfig extends SnmpCommunicationConfig {
@Override
public SnmpCommunicationSpec getSpec() {
return SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING;
@ -28,4 +30,5 @@ public class SharedAttributesSettingSnmpCommunicationConfig extends SnmpCommunic
public SnmpMethod getMethod() {
return SnmpMethod.SET;
}
}

View File

@ -13,17 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.configs;
package org.thingsboard.server.common.data.transport.snmp.config.impl;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
@EqualsAndHashCode(callSuper = true)
@Data
public class TelemetryQueryingSnmpCommunicationConfig extends RepeatingQueryingSnmpCommunicationConfig {
@Override
public SnmpCommunicationSpec getSpec() {
return SnmpCommunicationSpec.TELEMETRY_QUERYING;
}
}

View File

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.config.impl;
import lombok.Data;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@Data
public class ToDeviceRpcCommandSettingSnmpCommunicationConfig implements SnmpCommunicationConfig {
private SnmpMapping mapping;
@Override
public SnmpCommunicationSpec getSpec() {
return SnmpCommunicationSpec.TO_DEVICE_RPC_COMMAND_SETTING;
}
@Override
public SnmpMethod getMethod() {
return SnmpMethod.SET;
}
public void setMapping(SnmpMapping mapping) {
this.mapping = mapping != null ? new SnmpMapping(mapping.getOid(), RPC_COMMAND_KEY_NAME, DataType.STRING) : null;
}
@Override
public List<SnmpMapping> getAllMappings() {
return Collections.singletonList(mapping);
}
@Override
public boolean isValid() {
return mapping != null && mapping.isValid();
}
public static final String RPC_COMMAND_KEY_NAME = "rpcCommand";
}

View File

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.transport.snmp.config.impl;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import org.thingsboard.server.common.data.transport.snmp.SnmpMethod;
import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
import java.util.Collections;
import java.util.List;
@EqualsAndHashCode(callSuper = true)
@Data
public class ToDeviceRpcResponseQueryingSnmpCommunicationConfig extends RepeatingQueryingSnmpCommunicationConfig {
private SnmpMapping mapping;
@Override
public SnmpCommunicationSpec getSpec() {
return SnmpCommunicationSpec.TO_DEVICE_RPC_RESPONSE_QUERYING;
}
@Override
public SnmpMethod getMethod() {
return SnmpMethod.GET;
}
public void setMapping(SnmpMapping mapping) {
this.mapping = mapping != null ? new SnmpMapping(mapping.getOid(), RPC_RESPONSE_KEY_NAME, DataType.STRING) : null;
}
@Override
public List<SnmpMapping> getAllMappings() {
return Collections.singletonList(mapping);
}
@Override
public boolean isValid() {
return true;
}
public static final String RPC_RESPONSE_KEY_NAME = "rpcResponse";
}

View File

@ -42,6 +42,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.service.ProtoTransportEntityService;
import org.thingsboard.server.transport.snmp.service.SnmpAuthService;
import org.thingsboard.server.transport.snmp.service.SnmpTransportBalancingService;
import org.thingsboard.server.transport.snmp.service.SnmpTransportService;
import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
@ -116,7 +117,7 @@ public class SnmpTransportContext extends TransportContext {
);
registerSessionMsgListener(deviceSessionContext);
} catch (Exception e) {
log.error("Failed to establish session for SNMP device {}: {}", device.getId(), e.getMessage());
log.error("Failed to establish session for SNMP device {}: {}", device.getId(), e.toString());
return;
}
sessions.put(device.getId(), deviceSessionContext);
@ -177,7 +178,9 @@ public class SnmpTransportContext extends TransportContext {
);
transportService.registerAsyncSession(sessionInfo, deviceSessionContext);
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), TransportServiceCallback.EMPTY);
transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), TransportServiceCallback.EMPTY);
deviceSessionContext.setSessionInfo(sessionInfo);
deviceSessionContext.setDeviceInfo(msg.getDeviceInfo());
} else {

View File

@ -0,0 +1,150 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.snmp.service;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.snmp4j.PDU;
import org.snmp4j.ScopedPDU;
import org.snmp4j.smi.Integer32;
import org.snmp4j.smi.Null;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.Variable;
import org.snmp4j.smi.VariableBinding;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import org.thingsboard.server.common.data.transport.snmp.SnmpProtocolVersion;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@TbSnmpTransportComponent
@Service
@Slf4j
public class PduMapper {
public PDU createPdu(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
PDU pdu;
SnmpDeviceTransportConfiguration deviceTransportConfiguration = sessionContext.getDeviceTransportConfiguration();
SnmpProtocolVersion snmpVersion = deviceTransportConfiguration.getProtocolVersion();
switch (snmpVersion) {
case V1:
case V2C:
pdu = new PDU();
break;
case V3:
ScopedPDU scopedPdu = new ScopedPDU();
scopedPdu.setContextName(new OctetString(deviceTransportConfiguration.getContextName()));
scopedPdu.setContextEngineID(new OctetString(deviceTransportConfiguration.getEngineId()));
pdu = scopedPdu;
break;
default:
throw new UnsupportedOperationException("SNMP version " + snmpVersion + " is not supported");
}
pdu.setType(communicationConfig.getMethod().getCode());
pdu.addAll(communicationConfig.getAllMappings().stream()
.filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey()))
.map(mapping -> Optional.ofNullable(values.get(mapping.getKey()))
.map(value -> {
Variable variable = toSnmpVariable(mapping, value);
return new VariableBinding(new OID(mapping.getOid()), variable);
})
.orElseGet(() -> new VariableBinding(new OID(mapping.getOid()))))
.collect(Collectors.toList()));
return pdu;
}
private Variable toSnmpVariable(SnmpMapping mapping, String value) {
Variable variable;
switch (mapping.getDataType()) {
case LONG:
try {
variable = new Integer32(Integer.parseInt(value));
break;
} catch (NumberFormatException ignored) {
}
case DOUBLE:
case BOOLEAN:
case STRING:
case JSON:
default:
variable = new OctetString(value);
}
return variable;
}
public JsonObject processPdu(PDU pdu, DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) {
List<VariableBinding> variablesBindings = IntStream.range(0, pdu.size())
.mapToObj(pdu::get)
.filter(Objects::nonNull)
.filter(variableBinding -> !(variableBinding.getVariable() instanceof Null))
.collect(Collectors.toList());
JsonObject data = new JsonObject();
Map<OID, SnmpMapping> mappings = new HashMap<>();
for (SnmpMapping mapping : communicationConfig.getAllMappings()) {
OID oid = new OID(mapping.getOid());
mappings.put(oid, mapping);
}
variablesBindings.forEach(variableBinding -> {
log.trace("Processing variable binding: {}", variableBinding);
OID oid = variableBinding.getOid();
SnmpMapping mapping = mappings.get(oid);
if (mapping == null) {
log.debug("No SNMP mapping for oid {}", oid);
return;
}
processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), data);
});
return data;
}
private void processValue(String key, DataType dataType, String value, JsonObject result) {
switch (dataType) {
case LONG:
result.addProperty(key, Long.parseLong(value));
break;
case BOOLEAN:
result.addProperty(key, Boolean.parseBoolean(value));
break;
case DOUBLE:
result.addProperty(key, Double.parseDouble(value));
break;
case STRING:
case JSON:
default:
result.addProperty(key, value);
}
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.snmp;
package org.thingsboard.server.transport.snmp.service;
import lombok.RequiredArgsConstructor;
import org.snmp4j.AbstractTarget;
@ -24,6 +24,7 @@ import org.snmp4j.security.SecurityLevel;
import org.snmp4j.security.SecurityModel;
import org.snmp4j.security.SecurityProtocols;
import org.snmp4j.security.USM;
import org.snmp4j.smi.Address;
import org.snmp4j.smi.GenericAddress;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
@ -36,6 +37,8 @@ import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import org.thingsboard.server.transport.snmp.service.SnmpTransportService;
import org.thingsboard.server.transport.snmp.session.DeviceSessionContext;
import java.util.Optional;
@Service
@TbSnmpTransportComponent
@RequiredArgsConstructor
@ -97,7 +100,8 @@ public class SnmpAuthService {
throw new UnsupportedOperationException("SNMP protocol version " + protocolVersion + " is not supported");
}
target.setAddress(GenericAddress.parse(snmpUnderlyingProtocol + ":" + deviceTransportConfig.getHost() + "/" + deviceTransportConfig.getPort()));
Address address = GenericAddress.parse(snmpUnderlyingProtocol + ":" + deviceTransportConfig.getHost() + "/" + deviceTransportConfig.getPort());
target.setAddress(Optional.ofNullable(address).orElseThrow(() -> new IllegalArgumentException("Address of the SNMP device is invalid")));
target.setTimeout(profileTransportConfig.getTimeoutMs());
target.setRetries(profileTransportConfig.getRetries());
target.setVersion(protocolVersion.getCode());

View File

@ -16,11 +16,11 @@
package org.thingsboard.server.transport.snmp.service;
import com.google.gson.JsonObject;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.snmp4j.PDU;
import org.snmp4j.ScopedPDU;
import org.snmp4j.Snmp;
import org.snmp4j.TransportMapping;
import org.snmp4j.event.ResponseEvent;
@ -28,26 +28,18 @@ import org.snmp4j.mp.MPv3;
import org.snmp4j.security.SecurityModels;
import org.snmp4j.security.SecurityProtocols;
import org.snmp4j.security.USM;
import org.snmp4j.smi.Integer32;
import org.snmp4j.smi.Null;
import org.snmp4j.smi.OID;
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.Variable;
import org.snmp4j.smi.VariableBinding;
import org.snmp4j.transport.DefaultTcpTransportMapping;
import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.SnmpMapping;
import org.thingsboard.server.common.data.transport.snmp.SnmpProtocolVersion;
import org.thingsboard.server.common.data.transport.snmp.configs.RepeatingQueryingSnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.configs.SnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.impl.ToDeviceRpcResponseQueryingSnmpCommunicationConfig;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
@ -60,44 +52,37 @@ import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@TbSnmpTransportComponent
@Service
@Slf4j
@RequiredArgsConstructor
public class SnmpTransportService implements TbTransportService {
private final TransportService transportService;
private final PduMapper pduMapper;
@Getter
private Snmp snmp;
private ScheduledExecutorService queryingExecutor;
private ExecutorService responseProcessingExecutor;
private final Map<SnmpCommunicationSpec, BiConsumer<JsonObject, DeviceSessionContext>> responseProcessors = new EnumMap<>(SnmpCommunicationSpec.class);
private final Map<SnmpCommunicationSpec, ResponseProcessor> responseProcessors = new EnumMap<>(SnmpCommunicationSpec.class);
@Value("${transport.snmp.response_processing.parallelism_level}")
private Integer responseProcessingParallelismLevel;
@Value("${transport.snmp.underlying_protocol}")
private String snmpUnderlyingProtocol;
public SnmpTransportService(TransportService transportService) {
this.transportService = transportService;
}
@PostConstruct
private void init() throws IOException {
log.info("Initializing SNMP transport service");
queryingExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("snmp-querying"));
responseProcessingExecutor = Executors.newWorkStealingPool(responseProcessingParallelismLevel);
@ -128,26 +113,23 @@ public class SnmpTransportService implements TbTransportService {
public void createQueryingTasks(DeviceSessionContext sessionContext) {
List<ScheduledFuture<?>> queryingTasks = sessionContext.getProfileTransportConfiguration().getCommunicationConfigs().stream()
.filter(config -> config.getSpec().isRepeatingQuerying())
.filter(communicationConfig -> communicationConfig instanceof RepeatingQueryingSnmpCommunicationConfig)
.map(config -> {
RepeatingQueryingSnmpCommunicationConfig repeatingCommunicationConfig = (RepeatingQueryingSnmpCommunicationConfig) config;
return createQueryingTaskForConfig(sessionContext, repeatingCommunicationConfig);
Long queryingFrequency = repeatingCommunicationConfig.getQueryingFrequencyMs();
return queryingExecutor.scheduleWithFixedDelay(() -> {
try {
if (sessionContext.isActive()) {
sendRequest(sessionContext, repeatingCommunicationConfig);
}
} catch (Exception e) {
log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString());
}
}, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS);
})
.collect(Collectors.toList());
sessionContext.setQueryingTasks(queryingTasks);
}
private ScheduledFuture<?> createQueryingTaskForConfig(DeviceSessionContext sessionContext, RepeatingQueryingSnmpCommunicationConfig communicationConfig) {
Long queryingFrequency = communicationConfig.getQueryingFrequencyMs();
return queryingExecutor.scheduleWithFixedDelay(() -> {
try {
if (sessionContext.isActive()) {
sendRequest(sessionContext, communicationConfig);
}
} catch (Exception e) {
log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString());
}
}, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS);
sessionContext.getQueryingTasks().addAll(queryingTasks);
}
public void cancelQueryingTasks(DeviceSessionContext sessionContext) {
@ -155,75 +137,31 @@ public class SnmpTransportService implements TbTransportService {
sessionContext.getQueryingTasks().clear();
}
public void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) throws IOException {
public void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig) {
sendRequest(sessionContext, communicationConfig, Collections.emptyMap());
}
public void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) throws IOException {
PDU request = createPdu(sessionContext, communicationConfig, values);
executeRequest(sessionContext, request);
}
public void sendRequest(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
PDU request = pduMapper.createPdu(sessionContext, communicationConfig, values);
private void executeRequest(DeviceSessionContext sessionContext, PDU request) throws IOException {
if (request.size() > 0) {
log.trace("Executing SNMP request for device {}. Variables bindings: {}", sessionContext.getDeviceId(), request.getVariableBindings());
snmp.send(request, sessionContext.getTarget(), sessionContext.getDeviceProfile().getId(), sessionContext);
RequestInfo requestInfo = new RequestInfo(sessionContext.getDeviceProfile().getId(), communicationConfig);
try {
snmp.send(request, sessionContext.getTarget(), requestInfo, sessionContext);
} catch (IOException e) {
log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString());
}
}
}
private PDU createPdu(DeviceSessionContext sessionContext, SnmpCommunicationConfig communicationConfig, Map<String, String> values) {
PDU pdu;
SnmpDeviceTransportConfiguration deviceTransportConfiguration = sessionContext.getDeviceTransportConfiguration();
SnmpProtocolVersion snmpVersion = deviceTransportConfiguration.getProtocolVersion();
switch (snmpVersion) {
case V1:
case V2C:
pdu = new PDU();
break;
case V3:
ScopedPDU scopedPdu = new ScopedPDU();
scopedPdu.setContextName(new OctetString(deviceTransportConfiguration.getContextName()));
scopedPdu.setContextEngineID(new OctetString(deviceTransportConfiguration.getEngineId()));
pdu = scopedPdu;
break;
default:
throw new UnsupportedOperationException("SNMP version " + snmpVersion + " is not supported");
}
pdu.setType(communicationConfig.getMethod().getCode());
pdu.addAll(communicationConfig.getMappings().stream()
.filter(mapping -> values.isEmpty() || values.containsKey(mapping.getKey()))
.map(mapping -> Optional.ofNullable(values.get(mapping.getKey()))
.map(value -> {
Variable variable;
switch (mapping.getDataType()) {
case LONG:
try {
variable = new Integer32(Integer.parseInt(value));
break;
} catch (NumberFormatException ignored) {
}
case DOUBLE:
case BOOLEAN:
case STRING:
case JSON:
default:
variable = new OctetString(value);
}
return new VariableBinding(new OID(mapping.getOid()), variable);
})
.orElseGet(() -> new VariableBinding(new OID(mapping.getOid()))))
.collect(Collectors.toList()));
return pdu;
}
public void processResponseEvent(DeviceSessionContext sessionContext, ResponseEvent event) {
((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext);
if (event.getError() != null) {
log.warn("Response error: {}", event.getError().getMessage(), event.getError());
log.warn("SNMP response error: {}", event.getError().toString());
return;
}
@ -232,108 +170,65 @@ public class SnmpTransportService implements TbTransportService {
log.debug("No response from SNMP device {}, requestId: {}", sessionContext.getDeviceId(), event.getRequest().getRequestID());
return;
}
DeviceProfileId deviceProfileId = (DeviceProfileId) event.getUserObject();
log.debug("[{}] Processing SNMP response for device {} with device profile {}: {}",
response.getRequestID(), sessionContext.getDeviceId(), deviceProfileId, response);
responseProcessingExecutor.execute(() -> processResponse(sessionContext, response));
RequestInfo requestInfo = (RequestInfo) event.getUserObject();
responseProcessingExecutor.execute(() -> {
processResponse(sessionContext, response, requestInfo);
});
}
private void processResponse(DeviceSessionContext sessionContext, PDU responsePdu) {
Map<OID, SnmpMapping> mappings = new HashMap<>();
Map<OID, SnmpCommunicationConfig> configs = new HashMap<>();
Map<SnmpCommunicationSpec, JsonObject> responses = new EnumMap<>(SnmpCommunicationSpec.class);
for (SnmpCommunicationConfig config : sessionContext.getProfileTransportConfiguration().getCommunicationConfigs()) {
for (SnmpMapping mapping : config.getMappings()) {
OID oid = new OID(mapping.getOid());
mappings.put(oid, mapping);
configs.put(oid, config);
}
responses.put(config.getSpec(), new JsonObject());
}
for (int i = 0; i < responsePdu.size(); i++) {
VariableBinding variableBinding = responsePdu.get(i);
log.trace("Processing variable binding {}: {}", i, variableBinding);
if (variableBinding.getVariable() instanceof Null) {
log.debug("Response variable is empty");
continue;
}
OID oid = variableBinding.getOid();
if (!mappings.containsKey(oid)) {
log.debug("No SNMP mapping for oid {}", oid);
continue;
}
SnmpCommunicationSpec spec = configs.get(oid).getSpec();
if (!responseProcessors.containsKey(spec)) {
log.debug("No response processor found for spec {}", spec);
continue;
}
SnmpMapping mapping = mappings.get(oid);
processValue(mapping.getKey(), mapping.getDataType(), variableBinding.toValueString(), responses.get(spec));
}
if (responses.values().stream().allMatch(response -> response.entrySet().isEmpty())) {
log.debug("No values is the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), responsePdu.getRequestID());
private void processResponse(DeviceSessionContext sessionContext, PDU response, RequestInfo requestInfo) {
ResponseProcessor responseProcessor = responseProcessors.get(requestInfo.getCommunicationConfig().getSpec());
if (responseProcessor == null) {
return;
}
responses.forEach((spec, response) -> {
Optional.ofNullable(responseProcessors.get(spec))
.ifPresent(responseProcessor -> {
if (!response.entrySet().isEmpty()) {
responseProcessor.accept(response, sessionContext);
}
});
});
JsonObject responseData = pduMapper.processPdu(response, sessionContext, requestInfo.getCommunicationConfig());
if (responseData.entrySet().isEmpty()) {
log.debug("No values is the SNMP response for device {}. Request id: {}", sessionContext.getDeviceId(), response.getRequestID());
return;
}
responseProcessor.process(responseData, sessionContext);
reportActivity(sessionContext.getSessionInfo());
}
private void configureResponseProcessors() {
responseProcessors.put(SnmpCommunicationSpec.TELEMETRY_QUERYING, (response, sessionContext) -> {
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(response);
transportService.process(sessionContext.getSessionInfo(), postTelemetryMsg, TransportServiceCallback.EMPTY);
log.debug("Posted telemetry for device {}: {}", sessionContext.getDeviceId(), response);
transportService.process(sessionContext.getSessionInfo(), postTelemetryMsg, null);
log.debug("Posted telemetry for SNMP device {}: {}", sessionContext.getDeviceId(), response);
});
responseProcessors.put(SnmpCommunicationSpec.CLIENT_ATTRIBUTES_QUERYING, (response, sessionContext) -> {
TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(response);
transportService.process(sessionContext.getSessionInfo(), postAttributesMsg, TransportServiceCallback.EMPTY);
log.debug("Posted attributes for device {}: {}", sessionContext.getDeviceId(), response);
transportService.process(sessionContext.getSessionInfo(), postAttributesMsg, null);
log.debug("Posted attributes for SNMP device {}: {}", sessionContext.getDeviceId(), response);
});
responseProcessors.put(SnmpCommunicationSpec.TO_DEVICE_RPC_RESPONSE_QUERYING, (response, sessionContext) -> {
String rpcResponse = response.get(ToDeviceRpcResponseQueryingSnmpCommunicationConfig.RPC_RESPONSE_KEY_NAME).getAsString();
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
.setPayload(rpcResponse)
.build();
transportService.process(sessionContext.getSessionInfo(), rpcResponseMsg, null);
log.debug("Processed RPC response from device {}: {}", sessionContext.getDeviceId(), rpcResponse);
});
// responseProcessors.put(, (response, sessionContext) -> {
// TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(sessionContext.getDeviceId(), response);
// transportService.process(sessionContext.getSessionInfo(), claimDeviceMsg, null);
// });
}
private void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
transportService.process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(false)
.setRpcSubscription(false)
.setAttributeSubscription(true)
.setRpcSubscription(true)
.setLastActivityTime(System.currentTimeMillis())
.build(), TransportServiceCallback.EMPTY);
}
private void processValue(String key, DataType dataType, String value, JsonObject result) {
if (StringUtils.isEmpty(value)) return;
switch (dataType) {
case LONG:
result.addProperty(key, Long.parseLong(value));
break;
case BOOLEAN:
result.addProperty(key, Boolean.parseBoolean(value));
break;
case DOUBLE:
result.addProperty(key, Double.parseDouble(value));
break;
default:
result.addProperty(key, value);
}
}
@Override
public String getName() {
@ -358,4 +253,15 @@ public class SnmpTransportService implements TbTransportService {
}
log.info("SNMP transport stopped!");
}
@Data
private static class RequestInfo {
private final DeviceProfileId deviceProfileId;
private final SnmpCommunicationConfig communicationConfig;
}
private interface ResponseProcessor {
void process(JsonObject responseData, DeviceSessionContext sessionContext);
}
}

View File

@ -27,6 +27,8 @@ import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfigu
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig;
import org.thingsboard.server.common.data.transport.snmp.config.impl.ToDeviceRpcCommandSettingSnmpCommunicationConfig;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
@ -36,13 +38,13 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponse
import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
import org.thingsboard.server.transport.snmp.SnmpAuthService;
import org.thingsboard.server.transport.snmp.SnmpTransportContext;
import org.thingsboard.server.transport.snmp.service.SnmpTransportService;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
@ -64,16 +66,12 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
private final SnmpTransportContext snmpTransportContext;
@Getter
@Setter
private long previousRequestExecutedAt = 0;
private final AtomicInteger msgIdSeq = new AtomicInteger(0);
@Getter
private boolean isActive = true;
@Getter
@Setter
private List<ScheduledFuture<?>> queryingTasks = new LinkedList<>();
private final List<ScheduledFuture<?>> queryingTasks = new LinkedList<>();
public DeviceSessionContext(Device device, DeviceProfile deviceProfile, String token,
SnmpDeviceProfileTransportConfiguration profileTransportConfiguration,
@ -116,7 +114,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
public void initializeTarget(SnmpDeviceProfileTransportConfiguration profileTransportConfig, SnmpDeviceTransportConfiguration deviceTransportConfig) throws Exception {
log.trace("Initializing target for SNMP session of device {}", device);
this.target = snmpTransportContext.getSnmpAuthService().setUpSnmpTarget(profileTransportConfig, deviceTransportConfig);
log.info("SNMP target initialized: {}", target);
log.debug("SNMP target initialized: {}", target);
}
public void close() {
@ -138,20 +136,14 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override
public void onAttributeUpdate(AttributeUpdateNotificationMsg attributeUpdateNotification) {
profileTransportConfiguration.getCommunicationConfigs().stream()
.filter(config -> config.getSpec() == SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING)
.findFirst()
getCommunicationConfigForSpec(SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING)
.ifPresent(communicationConfig -> {
Map<String, String> sharedAttributes = JsonConverter.toJson(attributeUpdateNotification).entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().isJsonPrimitive() ? entry.getValue().getAsString() : entry.getValue().toString()
));
try {
snmpTransportContext.getSnmpTransportService().sendRequest(this, communicationConfig, sharedAttributes);
} catch (Exception e) {
log.error("Failed to send request with shared attributes to SNMP device {}: {}", getDeviceId(), e.getMessage());
}
snmpTransportContext.getSnmpTransportService().sendRequest(this, communicationConfig, sharedAttributes);
});
}
@ -161,9 +153,23 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
getCommunicationConfigForSpec(SnmpCommunicationSpec.TO_DEVICE_RPC_COMMAND_SETTING)
.ifPresent(communicationConfig -> {
String value = JsonConverter.toJson(toDeviceRequest, true).toString();
snmpTransportContext.getSnmpTransportService().sendRequest(
this, communicationConfig,
Map.of(ToDeviceRpcCommandSettingSnmpCommunicationConfig.RPC_COMMAND_KEY_NAME, value)
);
});
}
@Override
public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) {
}
private Optional<SnmpCommunicationConfig> getCommunicationConfigForSpec(SnmpCommunicationSpec spec) {
return profileTransportConfiguration.getCommunicationConfigs().stream()
.filter(config -> config.getSpec() == spec)
.findFirst();
}
}