Merge pull request #9954 from thingsboard/improvements/monitoring

Improvements for monitoring service
This commit is contained in:
Andrew Shvayka 2024-01-15 16:13:12 +02:00 committed by GitHub
commit b1d1c7e4eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 111 additions and 63 deletions

View File

@ -23,6 +23,10 @@ public interface MonitoringTarget {
String getBaseUrl(); String getBaseUrl();
default String getQueue() {
return "Main";
}
boolean isCheckDomainIps(); boolean isCheckDomainIps();
} }

View File

@ -22,10 +22,15 @@ public class TransportInfo {
private final TransportType transportType; private final TransportType transportType;
private final String baseUrl; private final String baseUrl;
private final String queue;
@Override @Override
public String toString() { public String toString() {
return String.format("%s transport (%s)", transportType, baseUrl); if (queue.equals("Main")) {
return String.format("*%s* (%s)", transportType.getName(), baseUrl);
} else {
return String.format("*%s* (%s) _%s_", transportType.getName(), baseUrl, queue);
}
} }
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.monitoring.config.transport; package org.thingsboard.monitoring.config.transport;
import lombok.Data; import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.monitoring.config.MonitoringTarget; import org.thingsboard.monitoring.config.MonitoringTarget;
import java.util.UUID; import java.util.UUID;
@ -25,6 +26,7 @@ public class TransportMonitoringTarget implements MonitoringTarget {
private String baseUrl; private String baseUrl;
private DeviceConfig device; // set manually during initialization private DeviceConfig device; // set manually during initialization
private String queue;
private boolean checkDomainIps; private boolean checkDomainIps;
@Override @Override
@ -32,4 +34,8 @@ public class TransportMonitoringTarget implements MonitoringTarget {
return device.getId(); return device.getId();
} }
public String getQueue() {
return StringUtils.defaultIfEmpty(queue, "Main");
}
} }

View File

@ -27,11 +27,12 @@ import org.thingsboard.monitoring.service.transport.impl.MqttTransportHealthChec
@Getter @Getter
public enum TransportType { public enum TransportType {
MQTT(MqttTransportHealthChecker.class), MQTT("MQTT", MqttTransportHealthChecker.class),
COAP(CoapTransportHealthChecker.class), COAP("CoAP",CoapTransportHealthChecker.class),
HTTP(HttpTransportHealthChecker.class), HTTP("HTTP", HttpTransportHealthChecker.class),
LWM2M(Lwm2mTransportHealthChecker.class); LWM2M("LwM2M", Lwm2mTransportHealthChecker.class);
private final String name;
private final Class<? extends TransportHealthChecker<?>> serviceClass; private final Class<? extends TransportHealthChecker<?>> serviceClass;
} }

View File

@ -34,7 +34,7 @@ public class HighLatencyNotification implements Notification {
StringBuilder text = new StringBuilder(); StringBuilder text = new StringBuilder();
text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n");
highLatencies.forEach(latency -> { highLatencies.forEach(latency -> {
text.append(String.format("[%s] %s\n", latency.getKey(), latency.getFormattedValue())); text.append(String.format("[%s] *%s*\n", latency.getKey(), latency.getFormattedValue()));
}); });
return text.toString(); return text.toString();
} }

View File

@ -43,7 +43,7 @@ public class ServiceFailureNotification implements Notification {
if (errorMsg == null) { if (errorMsg == null) {
errorMsg = error.getClass().getSimpleName(); errorMsg = error.getClass().getSimpleName();
} }
return String.format("[%s] Failure: %s (number of subsequent failures: %s)", serviceKey, errorMsg, failuresCount); return String.format("%s - Failure: %s (number of subsequent failures: %s)", serviceKey, errorMsg, failuresCount);
} }
} }

View File

@ -25,7 +25,7 @@ public class ServiceRecoveryNotification implements Notification {
@Override @Override
public String getText() { public String getText() {
return String.format("[%s] is OK", serviceKey); return String.format("%s is OK", serviceKey);
} }
} }

View File

@ -17,6 +17,8 @@ package org.thingsboard.monitoring.notification;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.monitoring.data.notification.Notification; import org.thingsboard.monitoring.data.notification.Notification;
import org.thingsboard.monitoring.notification.channels.NotificationChannel; import org.thingsboard.monitoring.notification.channels.NotificationChannel;
@ -24,7 +26,6 @@ import org.thingsboard.monitoring.notification.channels.NotificationChannel;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@ -34,15 +35,20 @@ public class NotificationService {
private final List<NotificationChannel> notificationChannels; private final List<NotificationChannel> notificationChannels;
private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(); private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor();
public void sendNotification(Notification notification) { @Value("${monitoring.notifications.message_prefix}")
forEachNotificationChannel(notificationChannel -> notificationChannel.sendNotification(notification)); private String messagePrefix;
}
private void forEachNotificationChannel(Consumer<NotificationChannel> function) { public void sendNotification(Notification notification) {
String message;
if (StringUtils.isEmpty(messagePrefix)) {
message = notification.getText();
} else {
message = messagePrefix + System.lineSeparator() + notification.getText();
}
notificationChannels.forEach(notificationChannel -> { notificationChannels.forEach(notificationChannel -> {
notificationExecutor.submit(() -> { notificationExecutor.submit(() -> {
try { try {
function.accept(notificationChannel); notificationChannel.sendNotification(message);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to send notification to {}", notificationChannel.getClass().getSimpleName(), e); log.error("Failed to send notification to {}", notificationChannel.getClass().getSimpleName(), e);
} }

View File

@ -15,10 +15,8 @@
*/ */
package org.thingsboard.monitoring.notification.channels; package org.thingsboard.monitoring.notification.channels;
import org.thingsboard.monitoring.data.notification.Notification;
public interface NotificationChannel { public interface NotificationChannel {
void sendNotification(Notification notification); void sendNotification(String message);
} }

View File

@ -21,7 +21,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.thingsboard.monitoring.data.notification.Notification;
import org.thingsboard.monitoring.notification.channels.NotificationChannel; import org.thingsboard.monitoring.notification.channels.NotificationChannel;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -29,11 +28,11 @@ import java.time.Duration;
import java.util.Map; import java.util.Map;
@Component @Component
@ConditionalOnProperty(value = "monitoring.notification_channels.slack.enabled", havingValue = "true") @ConditionalOnProperty(value = "monitoring.notifications.slack.enabled", havingValue = "true")
@Slf4j @Slf4j
public class SlackNotificationChannel implements NotificationChannel { public class SlackNotificationChannel implements NotificationChannel {
@Value("${monitoring.notification_channels.slack.webhook_url}") @Value("${monitoring.notifications.slack.webhook_url}")
private String webhookUrl; private String webhookUrl;
private RestTemplate restTemplate; private RestTemplate restTemplate;
@ -47,11 +46,7 @@ public class SlackNotificationChannel implements NotificationChannel {
} }
@Override @Override
public void sendNotification(Notification notification) { public void sendNotification(String message) {
sendNotification(notification.getText());
}
private void sendNotification(String message) {
restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class); restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class);
} }

View File

@ -29,6 +29,8 @@ import org.thingsboard.monitoring.service.BaseHealthChecker;
import org.thingsboard.monitoring.util.ResourceUtils; import org.thingsboard.monitoring.util.ResourceUtils;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials; import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials; import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
@ -38,6 +40,9 @@ import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData; import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.security.DeviceCredentialsType;
@ -45,27 +50,19 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
@Slf4j @Slf4j
public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> { public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> {
private static final String DEFAULT_DEVICE_NAME = "[Monitoring] %s transport (%s)";
private static final String DEFAULT_PROFILE_NAME = "[Monitoring] %s";
public TransportHealthChecker(C config, TransportMonitoringTarget target) { public TransportHealthChecker(C config, TransportMonitoringTarget target) {
super(config, target); super(config, target);
} }
@Override @Override
protected void initialize(TbClient tbClient) { protected void initialize(TbClient tbClient) {
String deviceName = String.format(DEFAULT_DEVICE_NAME, config.getTransportType(), target.getBaseUrl()); Device device = getOrCreateDevice(tbClient);
Device device = tbClient.getTenantDevice(deviceName)
.orElseGet(() -> {
log.info("Creating new device '{}'", deviceName);
return createDevice(config.getTransportType(), deviceName, tbClient);
});
DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId()) DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId())
.orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId())); .orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId()));
DeviceConfig deviceConfig = new DeviceConfig(); DeviceConfig deviceConfig = new DeviceConfig();
deviceConfig.setId(device.getId().toString()); deviceConfig.setId(device.getId().toString());
deviceConfig.setName(deviceName); deviceConfig.setName(device.getName());
deviceConfig.setCredentials(credentials); deviceConfig.setCredentials(credentials);
target.setDevice(deviceConfig); target.setDevice(deviceConfig);
} }
@ -77,51 +74,43 @@ public abstract class TransportHealthChecker<C extends TransportMonitoringConfig
@Override @Override
protected Object getInfo() { protected Object getInfo() {
return new TransportInfo(getTransportType(), target.getBaseUrl()); return new TransportInfo(getTransportType(), target.getBaseUrl(), target.getQueue());
} }
@Override @Override
protected String getKey() { protected String getKey() {
return getTransportType().name().toLowerCase() + "Transport"; return getTransportType().name().toLowerCase() + (target.getQueue().equals("Main") ? "" : target.getQueue()) + "Transport";
} }
protected abstract TransportType getTransportType(); protected abstract TransportType getTransportType();
private Device createDevice(TransportType transportType, String name, TbClient tbClient) { private Device getOrCreateDevice(TbClient tbClient) {
Device device = new Device(); TransportType transportType = config.getTransportType();
device.setName(name); String deviceName = String.format("%s (%s) - %s", transportType.getName(), target.getQueue(), target.getBaseUrl());
Device device = tbClient.getTenantDevice(deviceName).orElse(null);
if (device != null) {
return device;
}
log.info("Creating new device '{}'", deviceName);
device = new Device();
device.setName(deviceName);
DeviceCredentials credentials = new DeviceCredentials(); DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20)); credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20));
DeviceData deviceData = new DeviceData(); DeviceData deviceData = new DeviceData();
deviceData.setConfiguration(new DefaultDeviceConfiguration()); deviceData.setConfiguration(new DefaultDeviceConfiguration());
DeviceProfile deviceProfile = getOrCreateDeviceProfile(tbClient);
device.setType(deviceProfile.getName());
device.setDeviceProfileId(deviceProfile.getId());
if (transportType != TransportType.LWM2M) { if (transportType != TransportType.LWM2M) {
device.setType("default");
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
} else { } else {
tbClient.getResources(new PageLink(1, 0, "lwm2m monitoring")).getData()
.stream().findFirst()
.orElseGet(() -> {
TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class);
log.info("Creating LwM2M resource");
return tbClient.saveResource(newResource);
});
String profileName = String.format(DEFAULT_PROFILE_NAME, transportType);
DeviceProfile profile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData()
.stream().findFirst()
.orElseGet(() -> {
DeviceProfile newProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class);
newProfile.setName(profileName);
log.info("Creating LwM2M device profile");
return tbClient.saveDeviceProfile(newProfile);
});
device.setType(profileName);
device.setDeviceProfileId(profile.getId());
deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration()); deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration());
credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS); credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials(); LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials();
NoSecClientCredential client = new NoSecClientCredential(); NoSecClientCredential client = new NoSecClientCredential();
@ -133,7 +122,42 @@ public abstract class TransportHealthChecker<C extends TransportMonitoringConfig
lwm2mCreds.setBootstrap(bootstrap); lwm2mCreds.setBootstrap(bootstrap);
credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds)); credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds));
} }
return tbClient.saveDeviceWithCredentials(device, credentials).get(); return tbClient.saveDeviceWithCredentials(device, credentials).get();
} }
private DeviceProfile getOrCreateDeviceProfile(TbClient tbClient) {
TransportType transportType = config.getTransportType();
String profileName = String.format("%s (%s)", transportType.getName(), target.getQueue());
DeviceProfile deviceProfile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData()
.stream().findFirst().orElse(null);
if (deviceProfile != null) {
return deviceProfile;
}
log.info("Creating new device profile '{}'", profileName);
if (transportType != TransportType.LWM2M) {
deviceProfile = new DeviceProfile();
deviceProfile.setType(DeviceProfileType.DEFAULT);
deviceProfile.setTransportType(DeviceTransportType.DEFAULT);
DeviceProfileData profileData = new DeviceProfileData();
profileData.setConfiguration(new DefaultDeviceProfileConfiguration());
profileData.setTransportConfiguration(new DefaultDeviceProfileTransportConfiguration());
deviceProfile.setProfileData(profileData);
} else {
tbClient.getResources(new PageLink(1, 0, "lwm2m monitoring")).getData()
.stream().findFirst()
.orElseGet(() -> {
TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class);
log.info("Creating LwM2M resource");
return tbClient.saveResource(newResource);
});
deviceProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class);
}
deviceProfile.setName(profileName);
deviceProfile.setDefaultQueueName(target.getQueue());
return tbClient.saveDeviceProfile(deviceProfile);
}
} }

View File

@ -38,7 +38,7 @@ monitoring:
check_timeout_ms: '${CHECK_TIMEOUT_MS:5000}' check_timeout_ms: '${CHECK_TIMEOUT_MS:5000}'
# Failures threshold for notifying # Failures threshold for notifying
failures_threshold: '${FAILURES_THRESHOLD:2}' failures_threshold: '${FAILURES_THRESHOLD:1}'
# Notify after each REPEATED_FAILURE_NOTIFICATION subsequent failures, 0 to notify only once on first failure # Notify after each REPEATED_FAILURE_NOTIFICATION subsequent failures, 0 to notify only once on first failure
repeated_failure_notification: '${REPEATED_FAILURE_NOTIFICATION:4}' repeated_failure_notification: '${REPEATED_FAILURE_NOTIFICATION:4}'
@ -53,6 +53,8 @@ monitoring:
targets: targets:
# MQTT transport base url, tcp://DOMAIN:1883 by default # MQTT transport base url, tcp://DOMAIN:1883 by default
- base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}' - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}'
# Queue to use for target device
queue: '${MQTT_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
@ -66,6 +68,8 @@ monitoring:
targets: targets:
# CoAP transport base url, coap://DOMAIN by default # CoAP transport base url, coap://DOMAIN by default
- base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}' - base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}'
# Queue to use for target device
queue: '${COAP_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
@ -79,6 +83,8 @@ monitoring:
targets: targets:
# HTTP transport base url, http://DOMAIN by default # HTTP transport base url, http://DOMAIN by default
- base_url: '${HTTP_TRANSPORT_BASE_URL:http://${monitoring.domain}}' - base_url: '${HTTP_TRANSPORT_BASE_URL:http://${monitoring.domain}}'
# Queue to use for target device
queue: '${HTTP_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
@ -92,12 +98,15 @@ monitoring:
targets: targets:
# LwM2M transport base url, coap://DOMAIN:5685 by default # LwM2M transport base url, coap://DOMAIN:5685 by default
- base_url: '${LWM2M_TRANSPORT_BASE_URL:coap://${monitoring.domain}:5685}' - base_url: '${LWM2M_TRANSPORT_BASE_URL:coap://${monitoring.domain}:5685}'
# Queue to use for target device
queue: '${LWM2M_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc.
notification_channels: notifications:
message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}'
slack: slack:
# Enable notifying via Slack # Enable notifying via Slack
enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}' enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}'