diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java index 0e62670f81..0176b14d54 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTarget.java @@ -21,4 +21,8 @@ public interface MonitoringTarget { UUID getDeviceId(); + String getBaseUrl(); + + boolean isCheckDomainIps(); + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java index 77d702f779..c5f843192c 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringConfig.java @@ -23,9 +23,8 @@ import java.util.List; @Data public abstract class TransportMonitoringConfig implements MonitoringConfig { - private int requestTimeoutMs; - private List targets; + private int requestTimeoutMs; public abstract TransportType getTransportType(); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java index 816f64fbce..e3277af0ff 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java @@ -25,6 +25,7 @@ public class TransportMonitoringTarget implements MonitoringTarget { private String baseUrl; private DeviceConfig device; // set manually during initialization + private boolean checkDomainIps; @Override public UUID getDeviceId() { diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java index 3370d42462..52e73a64cd 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latencies.java @@ -25,4 +25,8 @@ public class Latencies { return String.format("%sRequest", key); } + public static String wsUpdate(String key) { + return String.format("%sWsUpdate", key); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java index c64291e30f..4b17bf179f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/Latency.java @@ -15,53 +15,15 @@ */ package org.thingsboard.monitoring.data; -import com.google.common.util.concurrent.AtomicDouble; -import lombok.RequiredArgsConstructor; +import lombok.Data; -import java.util.concurrent.atomic.AtomicInteger; - -@RequiredArgsConstructor +@Data(staticConstructor = "of") public class Latency { - private final String key; - private final AtomicDouble latencySum = new AtomicDouble(); - private final AtomicInteger counter = new AtomicInteger(); + private final double value; - public synchronized void report(double latencyInMs) { - latencySum.addAndGet(latencyInMs); - counter.incrementAndGet(); - } - - public synchronized double getAvg() { - return latencySum.get() / counter.get(); - } - - public boolean isNotEmpty() { - return counter.get() > 0; - } - - public synchronized void reset() { - latencySum.set(0.0); - counter.set(0); - } - - public String getKey() { - return key; - } - - public synchronized Latency snapshot() { - Latency snapshot = new Latency(key); - snapshot.latencySum.set(latencySum.get()); - snapshot.counter.set(counter.get()); - return snapshot; - } - - @Override - public String toString() { - return "Latency{" + - "key='" + key + '\'' + - ", avgLatency=" + getAvg() + - '}'; + public String getFormattedValue() { + return String.format("%,.2f ms", value); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java index 939cb7440f..5744e40d41 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java @@ -21,11 +21,11 @@ import java.util.Collection; public class HighLatencyNotification implements Notification { - private final Collection latencies; + private final Collection highLatencies; private final int thresholdMs; - public HighLatencyNotification(Collection latencies, int thresholdMs) { - this.latencies = latencies; + public HighLatencyNotification(Collection highLatencies, int thresholdMs) { + this.highLatencies = highLatencies; this.thresholdMs = thresholdMs; } @@ -33,8 +33,8 @@ public class HighLatencyNotification implements Notification { public String getText() { StringBuilder text = new StringBuilder(); text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); - latencies.forEach(latency -> { - text.append(String.format("[%s] %,.2f ms\n", latency.getKey(), latency.getAvg())); + highLatencies.forEach(latency -> { + text.append(String.format("[%s] %s\n", latency.getKey(), latency.getFormattedValue())); }); return text.toString(); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index affca1ce09..290fd307c6 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -15,6 +15,7 @@ */ package org.thingsboard.monitoring.service; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -30,13 +31,17 @@ import org.thingsboard.monitoring.util.TbStopWatch; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; @RequiredArgsConstructor @Slf4j public abstract class BaseHealthChecker { + @Getter protected final C config; + @Getter protected final T target; private Object info; @@ -48,6 +53,9 @@ public abstract class BaseHealthChecker> associates = new HashMap<>(); + public static final String TEST_TELEMETRY_KEY = "testData"; @PostConstruct @@ -84,6 +92,10 @@ public abstract class BaseHealthChecker { + healthChecker.check(wsClient); + }); } private void checkWsUpdate(WsClient wsClient, String testValue) { @@ -96,10 +108,9 @@ public abstract class BaseHealthChecker, T extends MonitoringTarget> { - @Autowired + @Autowired(required = false) private List configs; private final List> healthCheckers = new LinkedList<>(); private final List devices = new LinkedList<>(); @@ -54,18 +63,32 @@ public abstract class BaseMonitoringService, T ext @PostConstruct private void init() { + if (configs == null || configs.isEmpty()) { + return; + } tbClient.logIn(); configs.forEach(config -> { config.getTargets().forEach(target -> { - BaseHealthChecker healthChecker = (BaseHealthChecker) createHealthChecker(config, target); - log.info("Initializing {}", healthChecker.getClass().getSimpleName()); - healthChecker.initialize(tbClient); - devices.add(target.getDeviceId()); + BaseHealthChecker healthChecker = initHealthChecker(target, config); healthCheckers.add(healthChecker); + + if (target.isCheckDomainIps()) { + getAssociatedUrls(target.getBaseUrl()).forEach(url -> { + healthChecker.getAssociates().put(url, initHealthChecker(createTarget(url), config)); + }); + } }); }); } + private BaseHealthChecker initHealthChecker(T target, C config) { + BaseHealthChecker healthChecker = (BaseHealthChecker) createHealthChecker(config, target); + log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl()); + healthChecker.initialize(tbClient); + devices.add(target.getDeviceId()); + return healthChecker; + } + public final void runChecks() { if (healthCheckers.isEmpty()) { return; @@ -78,9 +101,8 @@ public abstract class BaseMonitoringService, T ext try (WsClient wsClient = wsClientFactory.createClient(accessToken)) { wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply(); - for (BaseHealthChecker healthChecker : healthCheckers) { - healthChecker.check(wsClient); + check(healthChecker, wsClient); } } reporter.reportLatencies(tbClient); @@ -94,8 +116,61 @@ public abstract class BaseMonitoringService, T ext } } + private void check(BaseHealthChecker healthChecker, WsClient wsClient) throws Exception { + healthChecker.check(wsClient); + + T target = healthChecker.getTarget(); + if (target.isCheckDomainIps()) { + Set associatedUrls = getAssociatedUrls(target.getBaseUrl()); + Map> associates = healthChecker.getAssociates(); + Set prevAssociatedUrls = new HashSet<>(associates.keySet()); + + boolean changed = false; + for (String url : associatedUrls) { + if (!prevAssociatedUrls.contains(url)) { + BaseHealthChecker associate = initHealthChecker(createTarget(url), healthChecker.getConfig()); + associates.put(url, associate); + changed = true; + } + } + for (String url : prevAssociatedUrls) { + if (!associatedUrls.contains(url)) { + stopHealthChecker(healthChecker); + associates.remove(url); + changed = true; + } + } + if (changed) { + log.info("Updated IPs for {}: {} (old list: {})", target.getBaseUrl(), associatedUrls, prevAssociatedUrls); + } + } + } + + @SneakyThrows + private Set getAssociatedUrls(String baseUrl) { + URI url = new URI(baseUrl); + return Arrays.stream(InetAddress.getAllByName(url.getHost())) + .map(InetAddress::getHostAddress) + .map(ip -> { + try { + return new URI(url.getScheme(), null, ip, url.getPort(), "", null, null).toString(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + } + + private void stopHealthChecker(BaseHealthChecker healthChecker) throws Exception { + healthChecker.destroyClient(); + devices.remove(healthChecker.getTarget().getDeviceId()); + log.info("Stopped {} for {}", healthChecker.getClass().getSimpleName(), healthChecker.getTarget().getBaseUrl()); + } + protected abstract BaseHealthChecker createHealthChecker(C config, T target); + protected abstract T createTarget(String baseUrl); + protected abstract String getName(); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java index f41454a76a..aabf04caee 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -63,25 +63,20 @@ public class MonitoringReporter { private String reportingAssetId; public void reportLatencies(TbClient tbClient) { - List latencies = this.latencies.values().stream() - .filter(Latency::isNotEmpty) - .map(latency -> { - Latency snapshot = latency.snapshot(); - latency.reset(); - return snapshot; - }) - .collect(Collectors.toList()); if (latencies.isEmpty()) { return; } - log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms") + log.debug("Latencies:\n{}", latencies.values().stream().map(latency -> latency.getKey() + ": " + latency.getFormattedValue()) .collect(Collectors.joining("\n")) + "\n"); - if (!latencyReportingEnabled) return; - if (latencies.stream().anyMatch(latency -> latency.getAvg() >= (double) latencyThresholdMs)) { - HighLatencyNotification highLatencyNotification = new HighLatencyNotification(latencies, latencyThresholdMs); + List highLatencies = latencies.values().stream() + .filter(latency -> latency.getValue() >= (double) latencyThresholdMs) + .collect(Collectors.toList()); + if (!highLatencies.isEmpty()) { + HighLatencyNotification highLatencyNotification = new HighLatencyNotification(highLatencies, latencyThresholdMs); notificationService.sendNotification(highLatencyNotification); + log.warn("{}", highLatencyNotification.getText()); } try { @@ -99,10 +94,11 @@ public class MonitoringReporter { } ObjectNode msg = JacksonUtil.newObjectNode(); - latencies.forEach(latency -> { - msg.set(latency.getKey(), new DoubleNode(latency.getAvg())); + latencies.values().forEach(latency -> { + msg.set(latency.getKey(), new DoubleNode(latency.getValue())); }); tbClient.saveEntityTelemetry(new AssetId(UUID.fromString(reportingAssetId)), "time", msg); + latencies.clear(); } catch (Exception e) { log.error("Failed to report latencies: {}", e.getMessage()); } @@ -112,7 +108,7 @@ public class MonitoringReporter { String latencyKey = key + "Latency"; double latencyInMs = (double) latencyInNanos / 1000_000; log.trace("Reporting latency [{}]: {} ms", key, latencyInMs); - latencies.computeIfAbsent(latencyKey, k -> new Latency(latencyKey)).report(latencyInMs); + latencies.put(latencyKey, Latency.of(latencyKey, latencyInMs)); } public void serviceFailure(Object serviceKey, Throwable error) { diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java index b3ce86e799..ca7c5a94da 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportsMonitoringService.java @@ -33,6 +33,13 @@ public final class TransportsMonitoringService extends BaseMonitoringService 20;" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 427, + "layoutY": 541 + }, + "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", + "name": "Test TBEL script", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return msg.temperature > 20;", + "tbelScript": "var a = \"a\";\nvar b = \"b\";\nreturn a.equals(\"a\") && b.equals(\"b\");" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 40, + "layoutY": 252 + }, + "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", + "name": "Add arrival timestamp", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", + "tbelScript": "metadata.arrivalTs = Date.now();\nreturn {msg: msg, metadata: metadata, msgType: msgType};" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 1467, + "layoutY": 267 + }, + "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", + "name": "Calculate additional latencies", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", + "tbelScript": "var arrivalLatency = metadata.arrivalTs - metadata.ts;\nvar processingTime = Date.now() - metadata.arrivalTs;\nmsg = {\n arrivalLatency: arrivalLatency,\n processingTime: processingTime\n};\nreturn {msg: msg, metadata: metadata, msgType: msgType};" + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 1438, + "layoutY": 403 + }, + "type": "org.thingsboard.rule.engine.transform.TbChangeOriginatorNode", + "name": "To latencies asset", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "originatorSource": "ENTITY", + "entityType": "ASSET", + "entityNamePattern": "[Monitoring] Latencies", + "relationsQuery": { + "direction": "FROM", + "maxLevel": 1, + "filters": [ + { + "relationType": "Contains", + "entityTypes": [] + } + ], + "fetchLastLevelOnly": false + } + }, + "externalId": null + }, + { + "additionalInfo": { + "description": null, + "layoutX": 1458, + "layoutY": 505 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "Save Timeseries", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 0 + }, + "externalId": null + }, + { + "additionalInfo": { + "description": "", + "layoutX": 928, + "layoutY": 266 + }, + "type": "org.thingsboard.rule.engine.filter.TbCheckMessageNode", + "name": "Has testData", + "debugMode": false, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "messageNames": [ + "testData" + ], + "metadataNames": [], + "checkAllKeys": true + }, + "externalId": null + }, + { + "additionalInfo": { + "description": null, + "layoutX": 1203, + "layoutY": 327 + }, + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "Save Timeseries with TTL", + "debugMode": true, + "singletonMode": false, + "configurationVersion": 0, + "configuration": { + "defaultTTL": 180, + "skipLatestPersistence": null, + "useServerTs": null + }, + "externalId": null + } + ], + "connections": [ + { + "fromIndex": 2, + "toIndex": 1, + "type": "Post attributes" + }, + { + "fromIndex": 2, + "toIndex": 3, + "type": "RPC Request from Device" + }, + { + "fromIndex": 2, + "toIndex": 4, + "type": "Other" + }, + { + "fromIndex": 2, + "toIndex": 5, + "type": "RPC Request to Device" + }, + { + "fromIndex": 2, + "toIndex": 16, + "type": "Post telemetry" + }, + { + "fromIndex": 6, + "toIndex": 2, + "type": "False" + }, + { + "fromIndex": 6, + "toIndex": 7, + "type": "True" + }, + { + "fromIndex": 7, + "toIndex": 2, + "type": "False" + }, + { + "fromIndex": 7, + "toIndex": 8, + "type": "True" + }, + { + "fromIndex": 8, + "toIndex": 2, + "type": "Success" + }, + { + "fromIndex": 9, + "toIndex": 10, + "type": "Success" + }, + { + "fromIndex": 10, + "toIndex": 11, + "type": "True" + }, + { + "fromIndex": 11, + "toIndex": 6, + "type": "True" + }, + { + "fromIndex": 12, + "toIndex": 9, + "type": "Success" + }, + { + "fromIndex": 13, + "toIndex": 14, + "type": "Success" + }, + { + "fromIndex": 14, + "toIndex": 15, + "type": "Success" + }, + { + "fromIndex": 16, + "toIndex": 0, + "type": "False" + }, + { + "fromIndex": 16, + "toIndex": 17, + "type": "True" + }, + { + "fromIndex": 17, + "toIndex": 13, + "type": "Success" + } + ], + "ruleChainConnections": null + } +} \ No newline at end of file diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index 88979bff1a..57b9b39ec0 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -51,8 +51,10 @@ monitoring: # MQTT QoS qos: '${MQTT_QOS_LEVEL:1}' targets: - # MQTT transport base url, tcp://DOMAIN:1883 by default + # MQTT transport base url, tcp://DOMAIN:1883 by default - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}' + # Whether to monitor IPs associated with the domain from base url + check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc. @@ -62,8 +64,10 @@ monitoring: # CoAP request timeout in milliseconds request_timeout_ms: '${COAP_REQUEST_TIMEOUT_MS:4000}' targets: - # CoAP transport base url, coap://DOMAIN by default + # CoAP transport base url, coap://DOMAIN by default - base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}' + # Whether to monitor IPs associated with the domain from base url + check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc. @@ -73,8 +77,10 @@ monitoring: # HTTP request timeout in milliseconds request_timeout_ms: '${HTTP_REQUEST_TIMEOUT_MS:4000}' targets: - # HTTP transport base url, http://DOMAIN by default + # HTTP transport base url, http://DOMAIN by default - base_url: '${HTTP_TRANSPORT_BASE_URL:http://${monitoring.domain}}' + # Whether to monitor IPs associated with the domain from base url + check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc. @@ -84,8 +90,10 @@ monitoring: # LwM2M request timeout in milliseconds request_timeout_ms: '${LWM2M_REQUEST_TIMEOUT_MS:4000}' targets: - # LwM2M transport base url, coap://DOMAIN:5685 by default + # LwM2M transport base url, coap://DOMAIN:5685 by default - base_url: '${LWM2M_TRANSPORT_BASE_URL:coap://${monitoring.domain}:5685}' + # Whether to monitor IPs associated with the domain from base url + check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc.