Monitoring: support for dynamic change of load-balancers list

This commit is contained in:
ViacheslavKlimov 2023-11-27 14:01:23 +02:00
parent 24241010b7
commit 98a87bfd33
2 changed files with 75 additions and 21 deletions

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.monitoring.service;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -30,13 +31,17 @@ import org.thingsboard.monitoring.util.TbStopWatch;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@RequiredArgsConstructor
@Slf4j
public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends MonitoringTarget> {
@Getter
protected final C config;
@Getter
protected final T target;
private Object info;
@ -48,6 +53,9 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
@Value("${monitoring.check_timeout_ms}")
private int resultCheckTimeoutMs;
@Getter
private final Map<String, BaseHealthChecker<C, T>> associates = new HashMap<>();
public static final String TEST_TELEMETRY_KEY = "testData";
@PostConstruct
@ -84,6 +92,10 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
} catch (Exception e) {
reporter.serviceFailure(MonitoredServiceKey.GENERAL, e);
}
associates.values().forEach(healthChecker -> {
healthChecker.check(wsClient);
});
}
private void checkWsUpdate(WsClient wsClient, String testValue) {
@ -99,7 +111,6 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
reporter.reportLatency(Latencies.wsUpdate(getKey()), stopWatch.getTime());
}
protected abstract void initClient() throws Exception;
protected abstract String createTestPayload(String testValue);

View File

@ -32,9 +32,12 @@ import org.thingsboard.monitoring.util.TbStopWatch;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@ -66,34 +69,24 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
tbClient.logIn();
configs.forEach(config -> {
config.getTargets().forEach(target -> {
initHealthChecker(target, config);
BaseHealthChecker<C, T> healthChecker = initHealthChecker(target, config);
healthCheckers.add(healthChecker);
if (target.isCheckDomainIps()) {
initIpsHealthCheckers(target, config);
getAssociatedUrls(target.getBaseUrl()).forEach(url -> {
healthChecker.getAssociates().put(url, initHealthChecker(createTarget(url), config));
});
}
});
});
}
private void initHealthChecker(T target, C config) {
private BaseHealthChecker<C, T> initHealthChecker(T target, C config) {
BaseHealthChecker<C, T> healthChecker = (BaseHealthChecker<C, T>) createHealthChecker(config, target);
log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl());
healthChecker.initialize(tbClient);
devices.add(target.getDeviceId());
healthCheckers.add(healthChecker);
}
@SneakyThrows
private void initIpsHealthCheckers(T target, C config) {
URI baseUrl = new URI(target.getBaseUrl());
String domain = baseUrl.getHost();
Set<String> ips = Arrays.stream(InetAddress.getAllByName(domain))
.map(InetAddress::getHostAddress)
.collect(Collectors.toSet());
for (String ip : ips) {
String url = new URI(baseUrl.getScheme(), null, ip, baseUrl.getPort(), "", null, null).toString();
initHealthChecker(createTarget(url), config);
}
return healthChecker;
}
public final void runChecks() {
@ -108,9 +101,8 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
try (WsClient wsClient = wsClientFactory.createClient(accessToken)) {
wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply();
for (BaseHealthChecker<C, T> healthChecker : healthCheckers) {
healthChecker.check(wsClient);
check(healthChecker, wsClient);
}
}
reporter.reportLatencies(tbClient);
@ -124,6 +116,57 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
}
}
private void check(BaseHealthChecker<C, T> healthChecker, WsClient wsClient) throws Exception {
healthChecker.check(wsClient);
T target = healthChecker.getTarget();
if (target.isCheckDomainIps()) {
Set<String> associatedUrls = getAssociatedUrls(target.getBaseUrl());
Map<String, BaseHealthChecker<C, T>> associates = healthChecker.getAssociates();
Set<String> prevAssociatedUrls = new HashSet<>(associates.keySet());
boolean changed = false;
for (String url : associatedUrls) {
if (!prevAssociatedUrls.contains(url)) {
BaseHealthChecker<C, T> associate = initHealthChecker(createTarget(url), healthChecker.getConfig());
associates.put(url, associate);
changed = true;
}
}
for (String url : prevAssociatedUrls) {
if (!associatedUrls.contains(url)) {
stopHealthChecker(healthChecker);
associates.remove(url);
changed = true;
}
}
if (changed) {
log.info("Updated IPs for {}: {} (old list: {})", target.getBaseUrl(), associatedUrls, prevAssociatedUrls);
}
}
}
@SneakyThrows
private Set<String> getAssociatedUrls(String baseUrl) {
URI url = new URI(baseUrl);
return Arrays.stream(InetAddress.getAllByName(url.getHost()))
.map(InetAddress::getHostAddress)
.map(ip -> {
try {
return new URI(url.getScheme(), null, ip, url.getPort(), "", null, null).toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toSet());
}
private void stopHealthChecker(BaseHealthChecker<C, T> healthChecker) throws Exception {
healthChecker.destroyClient();
devices.remove(healthChecker.getTarget().getDeviceId());
log.info("Stopped {} for {}", healthChecker.getClass().getSimpleName(), healthChecker.getTarget().getBaseUrl());
}
protected abstract BaseHealthChecker<?, ?> createHealthChecker(C config, T target);
protected abstract T createTarget(String baseUrl);