refactoring

This commit is contained in:
dashevchenko 2023-07-04 18:18:23 +03:00
parent 5acd5b3658
commit fee8aa359a
5 changed files with 151 additions and 56 deletions

View File

@ -80,6 +80,7 @@ import javax.servlet.http.HttpServletRequest;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -168,7 +169,7 @@ public class DeviceController extends BaseController {
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@RequestMapping(value = "/device/{deviceId}/commands", method = RequestMethod.GET) @RequestMapping(value = "/device/{deviceId}/commands", method = RequestMethod.GET)
@ResponseBody @ResponseBody
public List<String> getDevicePublishTelemetryCommands(@ApiParam(value = DEVICE_ID_PARAM_DESCRIPTION) public Map<String, String> getDevicePublishTelemetryCommands(@ApiParam(value = DEVICE_ID_PARAM_DESCRIPTION)
@PathVariable(DEVICE_ID) String strDeviceId, HttpServletRequest request) throws ThingsboardException, URISyntaxException { @PathVariable(DEVICE_ID) String strDeviceId, HttpServletRequest request) throws ThingsboardException, URISyntaxException {
checkParameter(DEVICE_ID, strDeviceId); checkParameter(DEVICE_ID, strDeviceId);
DeviceId deviceId = new DeviceId(toUUID(strDeviceId)); DeviceId deviceId = new DeviceId(toUUID(strDeviceId));

View File

@ -775,6 +775,10 @@ transport:
worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}" worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}" max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
so_keep_alive: "${NETTY_SO_KEEPALIVE:false}" so_keep_alive: "${NETTY_SO_KEEPALIVE:false}"
# Mqtt device connectivity host to publish telemetry
device_connectivity_host: "${MQTT_DEVICE_CONNECTIVITY_HOST:localhost}"
# Mqtt device connectivity port to publish telemetry
device_connectivity_port: "${MQTT_DEVICE_CONNECTIVITY_PORT:1883}"
# MQTT SSL configuration # MQTT SSL configuration
ssl: ssl:
# Enable/disable SSL support # Enable/disable SSL support
@ -785,6 +789,10 @@ transport:
bind_port: "${MQTT_SSL_BIND_PORT:8883}" bind_port: "${MQTT_SSL_BIND_PORT:8883}"
# SSL protocol: See https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#sslcontext-algorithms # SSL protocol: See https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#sslcontext-algorithms
protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}" protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"
# Mqtt ssl device connectivity host to publish telemetry
device_connectivity_host: "${MQTT_DEVICE_CONNECTIVITY_HOST:localhost}"
# Mqtt ssl device connectivity port to publish telemetry
device_connectivity_port: "${MQTT_DEVICE_CONNECTIVITY_PORT:8883}"
# Server SSL credentials # Server SSL credentials
credentials: credentials:
# Server credentials type (PEM - pem certificate file; KEYSTORE - java keystore) # Server credentials type (PEM - pem certificate file; KEYSTORE - java keystore)
@ -821,6 +829,10 @@ transport:
piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}" piggyback_timeout: "${COAP_PIGGYBACK_TIMEOUT:500}"
psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}" psm_activity_timer: "${COAP_PSM_ACTIVITY_TIMER:10000}"
paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}" paging_transmission_window: "${COAP_PAGING_TRANSMISSION_WINDOW:10000}"
# Coap device connectivity host to publish telemetry
device_connectivity_host: "${COAP_DEVICE_CONNECTIVITY_HOST:localhost}"
# Coap device connectivity port to publish telemetry
device_connectivity_port: "${COAP_DEVICE_CONNECTIVITY_PORT:5683}"
dtls: dtls:
# Enable/disable DTLS 1.2 support # Enable/disable DTLS 1.2 support
enabled: "${COAP_DTLS_ENABLED:false}" enabled: "${COAP_DTLS_ENABLED:false}"
@ -830,6 +842,10 @@ transport:
bind_address: "${COAP_DTLS_BIND_ADDRESS:0.0.0.0}" bind_address: "${COAP_DTLS_BIND_ADDRESS:0.0.0.0}"
# CoAP DTLS bind port # CoAP DTLS bind port
bind_port: "${COAP_DTLS_BIND_PORT:5684}" bind_port: "${COAP_DTLS_BIND_PORT:5684}"
# Coap DTLS device connectivity host to publish telemetry
device_connectivity_host: "${COAP_DEVICE_CONNECTIVITY_HOST:localhost}"
# Coap DTLS device connectivity port to publish telemetry
device_connectivity_port: "${COAP_DEVICE_CONNECTIVITY_PORT:5684}"
# Server DTLS credentials # Server DTLS credentials
credentials: credentials:
# Server credentials type (PEM - pem certificate file; KEYSTORE - java keystore) # Server credentials type (PEM - pem certificate file; KEYSTORE - java keystore)

View File

@ -38,13 +38,14 @@ import org.thingsboard.server.dao.entity.EntityDaoService;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
public interface DeviceService extends EntityDaoService { public interface DeviceService extends EntityDaoService {
DeviceInfo findDeviceInfoById(TenantId tenantId, DeviceId deviceId); DeviceInfo findDeviceInfoById(TenantId tenantId, DeviceId deviceId);
List<String> findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException; Map<String, String> findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException;
Device findDeviceById(TenantId tenantId, DeviceId deviceId); Device findDeviceById(TenantId tenantId, DeviceId deviceId);

View File

@ -0,0 +1,9 @@
package org.thingsboard.server.dao.device;
import lombok.Data;
@Data
public class DeviceConnectivityConfiguration {
private String deviceConnectivityHost;
private Integer deviceConnectivityPort;
}

View File

@ -20,6 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.event.TransactionalEventListener;
@ -51,6 +54,7 @@ import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfigu
import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration; import org.thingsboard.server.common.data.device.profile.CoapDeviceTypeConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration; import org.thingsboard.server.common.data.device.profile.DefaultCoapDeviceTypeConfiguration;
import org.thingsboard.server.common.data.device.profile.EfentoCoapDeviceTypeConfiguration;
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
@ -79,11 +83,11 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.PaginatedRemover;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -124,6 +128,46 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
@Autowired @Autowired
private EntityCountService countService; private EntityCountService countService;
@Bean
@ConfigurationProperties(prefix = "transport.mqtt")
public DeviceConnectivityConfiguration mqttDeviceConnectivityProperties() {
return new DeviceConnectivityConfiguration ();
}
@Bean
@ConfigurationProperties(prefix = "transport.mqtt.ssl")
public DeviceConnectivityConfiguration mqttsDeviceConnectivityProperties() {
return new DeviceConnectivityConfiguration ();
}
@Bean
@ConfigurationProperties(prefix = "transport.coap")
public DeviceConnectivityConfiguration coapDeviceConnectivityProperties() {
return new DeviceConnectivityConfiguration ();
}
@Bean
@ConfigurationProperties(prefix = "transport.coap.dtls")
public DeviceConnectivityConfiguration coapsDeviceConnectivityProperties() {
return new DeviceConnectivityConfiguration ();
}
@Autowired
@Qualifier("mqttDeviceConnectivityProperties")
private DeviceConnectivityConfiguration mqttProperties;
@Autowired
@Qualifier("mqttsDeviceConnectivityProperties")
private DeviceConnectivityConfiguration mqttsProperties;
@Autowired
@Qualifier("coapDeviceConnectivityProperties")
private DeviceConnectivityConfiguration coapProperties;
@Autowired
@Qualifier("coapsDeviceConnectivityProperties")
private DeviceConnectivityConfiguration coapsProperties;
@Override @Override
public DeviceInfo findDeviceInfoById(TenantId tenantId, DeviceId deviceId) { public DeviceInfo findDeviceInfoById(TenantId tenantId, DeviceId deviceId) {
log.trace("Executing findDeviceInfoById [{}]", deviceId); log.trace("Executing findDeviceInfoById [{}]", deviceId);
@ -132,63 +176,66 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
} }
@Override @Override
public List<String> findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException { public Map<String, String> findDevicePublishTelemetryCommands(String baseUrl, Device device) {
DeviceId deviceId = device.getId(); DeviceId deviceId = device.getId();
log.trace("Executing findDevicePublishTelemetryCommands [{}]", deviceId); log.trace("Executing findDevicePublishTelemetryCommands [{}]", deviceId);
validateId(deviceId, INCORRECT_DEVICE_ID + deviceId); validateId(deviceId, INCORRECT_DEVICE_ID + deviceId);
String hostname = new URI(baseUrl).getHost(); DeviceCredentials creds = deviceCredentialsService.findDeviceCredentialsByDeviceId(device.getTenantId(), deviceId);
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(device.getTenantId(), deviceId); DeviceCredentialsType credentialsType = creds.getCredentialsType();
DeviceCredentialsType credentialsType = deviceCredentials.getCredentialsType();
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId()); DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(device.getTenantId(), device.getDeviceProfileId());
DeviceTransportType transportType = deviceProfile.getTransportType();
ArrayList<String> commands = new ArrayList<>(); Map<String, String> commands = new HashMap<>();
switch (deviceProfile.getTransportType()) {
switch (transportType) {
case DEFAULT: case DEFAULT:
switch (credentialsType) { switch (credentialsType) {
case ACCESS_TOKEN: case ACCESS_TOKEN:
commands.add(getMqttAccessTokenCommand(hostname, deviceCredentials) + " -m " + PAYLOAD); commands.put("http", getHttpPublishCommand(baseUrl, creds));
commands.add(getHttpAccessTokenCommand(baseUrl, deviceCredentials)); commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), creds));
commands.add("echo -n " + PAYLOAD + " | " + getCoapAccessTokenCommand(hostname, deviceCredentials) + " -f-"); commands.put("mqtts", getMqttPublishCommand(mqttsProperties.getDeviceConnectivityHost(), mqttsProperties.getDeviceConnectivityPort(), creds));
break; commands.put("coap", getCoapPublishCommand(coapProperties.getDeviceConnectivityHost(), coapProperties.getDeviceConnectivityPort(), creds));
commands.put("coaps", getCoapPublishCommand(coapsProperties.getDeviceConnectivityHost(), coapsProperties.getDeviceConnectivityPort(), creds)); break;
case MQTT_BASIC: case MQTT_BASIC:
commands.add(getMqttBasicPublishCommand(hostname, deviceCredentials) + " -m " + PAYLOAD); commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), creds));
commands.put("mqtts", getMqttPublishCommand(mqttsProperties.getDeviceConnectivityHost(), mqttsProperties.getDeviceConnectivityPort(), creds));
break; break;
case X509_CERTIFICATE: case X509_CERTIFICATE:
commands.add(getMqttX509Command(hostname) + " -m " + PAYLOAD); commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(), creds));
commands.put("mqtts", getMqttPublishCommand(mqttsProperties.getDeviceConnectivityHost(), mqttsProperties.getDeviceConnectivityPort(), creds));
commands.put("coap", getCoapPublishCommand(coapProperties.getDeviceConnectivityHost(), coapProperties.getDeviceConnectivityPort(), creds));
commands.put("coaps", getCoapPublishCommand(coapsProperties.getDeviceConnectivityHost(), coapsProperties.getDeviceConnectivityPort(), creds));
break; break;
} }
break; break;
case MQTT: case MQTT:
MqttDeviceProfileTransportConfiguration transportConfiguration = MqttDeviceProfileTransportConfiguration transportConfiguration =
(MqttDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration(); (MqttDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration();
String topicName = transportConfiguration.getDeviceTelemetryTopic();
TransportPayloadType payloadType = transportConfiguration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); TransportPayloadType payloadType = transportConfiguration.getTransportPayloadTypeConfiguration().getTransportPayloadType();
String payload = (payloadType == TransportPayloadType.PROTOBUF) ? " -f protobufFileName" : " -m " + PAYLOAD; String payload = (payloadType == TransportPayloadType.PROTOBUF) ? " -f protobufFileName" : " -m " + PAYLOAD;
switch (credentialsType) {
case ACCESS_TOKEN: commands.put("mqtt", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(),
commands.add(getMqttAccessTokenCommand(hostname, deviceCredentials) + payload); topicName, creds, payload));
break; commands.put("mqtts", getMqttPublishCommand(mqttProperties.getDeviceConnectivityHost(), mqttProperties.getDeviceConnectivityPort(),
case MQTT_BASIC: topicName, creds, payload));
commands.add(getMqttBasicPublishCommand(hostname, deviceCredentials) + payload);
break;
case X509_CERTIFICATE:
commands.add(getMqttX509Command(hostname) + payload);
break;
}
break; break;
case COAP: case COAP:
CoapDeviceProfileTransportConfiguration coapTransportConfiguration = CoapDeviceProfileTransportConfiguration coapTransportConfiguration =
(CoapDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration(); (CoapDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration();
CoapDeviceTypeConfiguration coapConfiguration = coapTransportConfiguration.getCoapDeviceTypeConfiguration(); CoapDeviceTypeConfiguration coapConfiguration = coapTransportConfiguration.getCoapDeviceTypeConfiguration();
if (coapConfiguration instanceof DefaultCoapDeviceTypeConfiguration) { if (coapConfiguration instanceof DefaultCoapDeviceTypeConfiguration) {
DefaultCoapDeviceTypeConfiguration configuration = commands.put("coap", getCoapPublishCommand(coapProperties.getDeviceConnectivityHost(), coapProperties.getDeviceConnectivityPort(), creds));
(DefaultCoapDeviceTypeConfiguration) coapTransportConfiguration.getCoapDeviceTypeConfiguration(); commands.put("coaps", getCoapPublishCommand(coapsProperties.getDeviceConnectivityHost(), coapsProperties.getDeviceConnectivityPort(), creds));
TransportPayloadType transportPayloadType = configuration.getTransportPayloadTypeConfiguration().getTransportPayloadType(); } else if (coapConfiguration instanceof EfentoCoapDeviceTypeConfiguration) {
String payloadExample = (transportPayloadType == TransportPayloadType.PROTOBUF) ? " -t binary -f protobufFileName" : " -t json -f jsonFileName"; commands.put("coap", "Not supported");
commands.add(getCoapAccessTokenCommand(hostname, deviceCredentials) + payloadExample); commands.put("coaps", "Not supported");
} }
break; break;
default:
commands.put(transportType.name(), "Not supported");
} }
return commands; return commands;
} }
@ -752,17 +799,28 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
return EntityType.DEVICE; return EntityType.DEVICE;
} }
private String getHttpAccessTokenCommand(String baseurl, DeviceCredentials deviceCredentials) { private String getHttpPublishCommand(String baseurl, DeviceCredentials deviceCredentials) {
return String.format("curl -v -X POST %s/api/v1/%s/telemetry --header Content-Type:application/json --data " + PAYLOAD, baseurl, deviceCredentials.getCredentialsId()); return String.format("curl -v -X POST %s/api/v1/%s/telemetry --header Content-Type:application/json --data " + PAYLOAD,
baseurl, deviceCredentials.getCredentialsId());
} }
private String getMqttAccessTokenCommand(String hostname, DeviceCredentials deviceCredentials) { private String getMqttPublishCommand(String host, Integer port, DeviceCredentials deviceCredentials) {
return String.format("mosquitto_pub -d -q 1 -h %s -t v1/devices/me/telemetry -u %s", hostname, deviceCredentials.getCredentialsId()); return getMqttPublishCommand(host, port, "v1/devices/me/telemetry", deviceCredentials, " -m " + PAYLOAD);
} }
private String getMqttBasicPublishCommand(String hostname, DeviceCredentials deviceCredentials) { private String getMqttPublishCommand(String host, Integer port, String deviceTelemetryTopic, DeviceCredentials deviceCredentials, String payload) {
BasicMqttCredentials credentials = JacksonUtil.fromString(deviceCredentials.getCredentialsValue(), BasicMqttCredentials.class); StringBuilder command = new StringBuilder("mosquitto_pub -d -q 1");
StringBuilder command = new StringBuilder("mosquitto_pub -d -q 1 -h " + hostname + " -p 1883 -t v1/devices/me/telemetry"); command.append(" -h ").append(host);
command.append(" -p ").append(port);
command.append(" -t ").append(deviceTelemetryTopic);
switch (deviceCredentials.getCredentialsType()) {
case ACCESS_TOKEN:
command.append(" -u ").append(deviceCredentials.getCredentialsId());
break;
case MQTT_BASIC:
BasicMqttCredentials credentials = JacksonUtil.fromString(deviceCredentials.getCredentialsValue(),
BasicMqttCredentials.class);
if (credentials != null) { if (credentials != null) {
if (credentials.getClientId() != null) { if (credentials.getClientId() != null) {
command.append(" -i ").append(credentials.getClientId()); command.append(" -i ").append(credentials.getClientId());
@ -774,14 +832,24 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
command.append(" -P ").append(credentials.getPassword()); command.append(" -P ").append(credentials.getPassword());
} }
} }
break;
case X509_CERTIFICATE:
command.append(" --cafile server.pem --key key.pem --cert cert.pem");
}
command.append(payload);
return command.toString(); return command.toString();
} }
private String getMqttX509Command(String hostname) { private String getCoapPublishCommand(String host, Integer port, DeviceCredentials deviceCredentials) {
return String.format("mosquitto_pub --cafile server.pem -d -q 1 -h %s -p 8883 -t v1/devices/me/telemetry --key key.pem --cert cert.pem", hostname); switch (deviceCredentials.getCredentialsType()) {
case ACCESS_TOKEN:
return String.format("coap-client -m post coap://%s:%s/api/v1/%s/telemetry -t json -e %s",
host, port, deviceCredentials.getCredentialsId(), PAYLOAD);
case X509_CERTIFICATE:
return String.format("coap-client-openssl -v 9 -c cert.pem -j key.pem -m POST -t json -e %s " +
"coaps://%s:%s/api/v1/telemetry", PAYLOAD, host, port);
default:
return "Not supported";
} }
private String getCoapAccessTokenCommand(String hostname, DeviceCredentials deviceCredentials) {
return String.format("coap-client -m post coap://%s:5683/api/v1/%s/telemetry", hostname, deviceCredentials.getCredentialsId());
} }
} }