Merge pull request #8671 from thingsboard/feature/monitoring-refactoring
Monitoring service refactoring
This commit is contained in:
commit
4d04e51e12
@ -16,21 +16,47 @@
|
||||
package org.thingsboard.monitoring;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.monitoring.service.BaseMonitoringService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
@Slf4j
|
||||
public class ThingsboardMonitoringApplication {
|
||||
|
||||
@Autowired
|
||||
private List<BaseMonitoringService<?, ?>> monitoringServices;
|
||||
|
||||
@Value("${monitoring.monitoring_rate_ms}")
|
||||
private int monitoringRateMs;
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(ThingsboardMonitoringApplication.class)
|
||||
.properties(Map.of("spring.config.name", "tb-monitoring"))
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void startMonitoring() {
|
||||
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("monitoring-executor"));
|
||||
scheduler.scheduleWithFixedDelay(() -> {
|
||||
monitoringServices.forEach(monitoringService -> {
|
||||
monitoringService.runChecks();
|
||||
});
|
||||
}, 0, monitoringRateMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,12 +15,10 @@
|
||||
*/
|
||||
package org.thingsboard.monitoring.config;
|
||||
|
||||
import lombok.Data;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class MonitoringTargetConfig {
|
||||
public interface MonitoringConfig<T extends MonitoringTarget> {
|
||||
|
||||
private String baseUrl;
|
||||
private DeviceConfig device;
|
||||
List<T> getTargets();
|
||||
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public interface MonitoringTarget {
|
||||
|
||||
UUID getDeviceId();
|
||||
|
||||
}
|
||||
@ -13,12 +13,11 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config.service;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "monitoring.transports.coap.enabled", havingValue = "true")
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -25,6 +25,7 @@ import java.util.UUID;
|
||||
public class DeviceConfig {
|
||||
|
||||
private UUID id;
|
||||
private String name;
|
||||
private DeviceCredentials credentials;
|
||||
|
||||
public void setId(String id) {
|
||||
@ -13,12 +13,11 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config.service;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "monitoring.transports.http.enabled", havingValue = "true")
|
||||
@ -13,12 +13,11 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config.service;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "monitoring.transports.lwm2m.enabled", havingValue = "true")
|
||||
@ -13,14 +13,13 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config.service;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(name = "monitoring.transports.mqtt.enabled", havingValue = "true")
|
||||
@ -13,20 +13,19 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.data;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
|
||||
@Data
|
||||
public class TransportInfo {
|
||||
|
||||
private final TransportType transportType;
|
||||
private final String url;
|
||||
private final String baseUrl;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s (%s)", transportType, url);
|
||||
return String.format("%s transport (%s)", transportType, baseUrl);
|
||||
}
|
||||
|
||||
}
|
||||
@ -13,20 +13,19 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config.service;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.MonitoringConfig;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public abstract class TransportMonitoringConfig {
|
||||
public abstract class TransportMonitoringConfig implements MonitoringConfig<TransportMonitoringTarget> {
|
||||
|
||||
private int requestTimeoutMs;
|
||||
|
||||
private List<MonitoringTargetConfig> targets;
|
||||
private List<TransportMonitoringTarget> targets;
|
||||
|
||||
public abstract TransportType getTransportType();
|
||||
|
||||
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.monitoring.config.MonitoringTarget;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
public class TransportMonitoringTarget implements MonitoringTarget {
|
||||
|
||||
private String baseUrl;
|
||||
private DeviceConfig device; // set manually during initialization
|
||||
|
||||
@Override
|
||||
public UUID getDeviceId() {
|
||||
return device.getId();
|
||||
}
|
||||
|
||||
}
|
||||
@ -13,15 +13,15 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.config;
|
||||
package org.thingsboard.monitoring.config.transport;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.thingsboard.monitoring.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.transport.impl.CoapTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.transport.impl.HttpTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.transport.impl.Lwm2mTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.transport.impl.MqttTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.service.transport.impl.CoapTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.service.transport.impl.HttpTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.service.transport.impl.Lwm2mTransportHealthChecker;
|
||||
import org.thingsboard.monitoring.service.transport.impl.MqttTransportHealthChecker;
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
@ -15,16 +15,14 @@
|
||||
*/
|
||||
package org.thingsboard.monitoring.data;
|
||||
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
|
||||
public class Latencies {
|
||||
|
||||
public static final String WS_UPDATE = "wsUpdate";
|
||||
public static final String WS_CONNECT = "wsConnect";
|
||||
public static final String LOG_IN = "logIn";
|
||||
|
||||
public static String transportRequest(TransportType transportType) {
|
||||
return String.format("%sTransportRequest", transportType.name().toLowerCase());
|
||||
public static String request(String key) {
|
||||
return String.format("%sRequest", key);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -15,13 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.monitoring.data;
|
||||
|
||||
public class TransportFailureException extends RuntimeException {
|
||||
public class ServiceFailureException extends RuntimeException {
|
||||
|
||||
public TransportFailureException(Throwable cause) {
|
||||
public ServiceFailureException(Throwable cause) {
|
||||
super(cause.getMessage(), cause);
|
||||
}
|
||||
|
||||
public TransportFailureException(String message) {
|
||||
public ServiceFailureException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@ -13,34 +13,33 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.transport;
|
||||
package org.thingsboard.monitoring.service;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.TextNode;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.monitoring.client.TbClient;
|
||||
import org.thingsboard.monitoring.client.WsClient;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.service.TransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.MonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.MonitoringTarget;
|
||||
import org.thingsboard.monitoring.data.Latencies;
|
||||
import org.thingsboard.monitoring.data.MonitoredServiceKey;
|
||||
import org.thingsboard.monitoring.data.TransportFailureException;
|
||||
import org.thingsboard.monitoring.data.TransportInfo;
|
||||
import org.thingsboard.monitoring.service.MonitoringReporter;
|
||||
import org.thingsboard.monitoring.data.ServiceFailureException;
|
||||
import org.thingsboard.monitoring.util.TbStopWatch;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.UUID;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> {
|
||||
public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends MonitoringTarget> {
|
||||
|
||||
protected final C config;
|
||||
protected final MonitoringTargetConfig target;
|
||||
private TransportInfo transportInfo;
|
||||
protected final T target;
|
||||
|
||||
private Object info;
|
||||
|
||||
@Autowired
|
||||
private MonitoringReporter reporter;
|
||||
@ -51,73 +50,66 @@ public abstract class TransportHealthChecker<C extends TransportMonitoringConfig
|
||||
|
||||
public static final String TEST_TELEMETRY_KEY = "testData";
|
||||
|
||||
protected TransportHealthChecker(C config, MonitoringTargetConfig target) {
|
||||
this.config = config;
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
transportInfo = new TransportInfo(getTransportType(), target.getBaseUrl());
|
||||
info = getInfo();
|
||||
}
|
||||
|
||||
protected abstract void initialize(TbClient tbClient);
|
||||
|
||||
public final void check(WsClient wsClient) {
|
||||
log.debug("[{}] Checking", transportInfo);
|
||||
log.debug("[{}] Checking", info);
|
||||
try {
|
||||
wsClient.registerWaitForUpdate();
|
||||
|
||||
String testValue = UUID.randomUUID().toString();
|
||||
String testPayload = createTestPayload(testValue);
|
||||
try {
|
||||
initClientAndSendPayload(testPayload);
|
||||
log.trace("[{}] Sent test payload ({})", transportInfo, testPayload);
|
||||
initClient();
|
||||
stopWatch.start();
|
||||
sendTestPayload(testPayload);
|
||||
reporter.reportLatency(Latencies.request(getKey()), stopWatch.getTime());
|
||||
log.trace("[{}] Sent test payload ({})", info, testPayload);
|
||||
} catch (Throwable e) {
|
||||
throw new TransportFailureException(e);
|
||||
throw new ServiceFailureException(e);
|
||||
}
|
||||
|
||||
log.trace("[{}] Waiting for WS update", transportInfo);
|
||||
log.trace("[{}] Waiting for WS update", info);
|
||||
checkWsUpdate(wsClient, testValue);
|
||||
|
||||
reporter.serviceIsOk(transportInfo);
|
||||
reporter.serviceIsOk(info);
|
||||
reporter.serviceIsOk(MonitoredServiceKey.GENERAL);
|
||||
} catch (TransportFailureException transportFailureException) {
|
||||
reporter.serviceFailure(transportInfo, transportFailureException);
|
||||
} catch (ServiceFailureException serviceFailureException) {
|
||||
reporter.serviceFailure(info, serviceFailureException);
|
||||
} catch (Exception e) {
|
||||
reporter.serviceFailure(MonitoredServiceKey.GENERAL, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void initClientAndSendPayload(String payload) throws Throwable {
|
||||
initClient();
|
||||
stopWatch.start();
|
||||
sendTestPayload(payload);
|
||||
reporter.reportLatency(Latencies.transportRequest(getTransportType()), stopWatch.getTime());
|
||||
}
|
||||
|
||||
private void checkWsUpdate(WsClient wsClient, String testValue) {
|
||||
stopWatch.start();
|
||||
wsClient.waitForUpdate(resultCheckTimeoutMs);
|
||||
log.trace("[{}] Waited for WS update. Last WS msg: {}", transportInfo, wsClient.lastMsg);
|
||||
Object update = wsClient.getTelemetryUpdate(target.getDevice().getId(), TEST_TELEMETRY_KEY);
|
||||
log.trace("[{}] Waited for WS update. Last WS msg: {}", info, wsClient.lastMsg);
|
||||
Object update = wsClient.getTelemetryUpdate(target.getDeviceId(), TEST_TELEMETRY_KEY);
|
||||
if (update == null) {
|
||||
throw new TransportFailureException("No WS update arrived within " + resultCheckTimeoutMs + " ms");
|
||||
throw new ServiceFailureException("No WS update arrived within " + resultCheckTimeoutMs + " ms");
|
||||
} else if (!update.toString().equals(testValue)) {
|
||||
throw new TransportFailureException("Was expecting value " + testValue + " but got " + update);
|
||||
throw new ServiceFailureException("Was expecting value " + testValue + " but got " + update);
|
||||
}
|
||||
reporter.reportLatency(Latencies.WS_UPDATE, stopWatch.getTime());
|
||||
}
|
||||
|
||||
protected String createTestPayload(String testValue) {
|
||||
return JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString();
|
||||
}
|
||||
|
||||
protected abstract void initClient() throws Exception;
|
||||
|
||||
protected abstract String createTestPayload(String testValue);
|
||||
|
||||
protected abstract void sendTestPayload(String payload) throws Exception;
|
||||
|
||||
@PreDestroy
|
||||
protected abstract void destroyClient() throws Exception;
|
||||
|
||||
protected abstract TransportType getTransportType();
|
||||
protected abstract Object getInfo();
|
||||
protected abstract String getKey();
|
||||
|
||||
}
|
||||
@ -0,0 +1,101 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.service;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.thingsboard.monitoring.client.TbClient;
|
||||
import org.thingsboard.monitoring.client.WsClient;
|
||||
import org.thingsboard.monitoring.client.WsClientFactory;
|
||||
import org.thingsboard.monitoring.config.MonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.MonitoringTarget;
|
||||
import org.thingsboard.monitoring.data.Latencies;
|
||||
import org.thingsboard.monitoring.data.MonitoredServiceKey;
|
||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.util.TbStopWatch;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T extends MonitoringTarget> {
|
||||
|
||||
@Autowired
|
||||
private List<C> configs;
|
||||
private final List<BaseHealthChecker<C, T>> healthCheckers = new LinkedList<>();
|
||||
private final List<UUID> devices = new LinkedList<>();
|
||||
|
||||
@Autowired
|
||||
private TbClient tbClient;
|
||||
@Autowired
|
||||
private WsClientFactory wsClientFactory;
|
||||
@Autowired
|
||||
private TbStopWatch stopWatch;
|
||||
@Autowired
|
||||
private MonitoringReporter reporter;
|
||||
@Autowired
|
||||
protected ApplicationContext applicationContext;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
tbClient.logIn();
|
||||
configs.forEach(config -> {
|
||||
config.getTargets().forEach(target -> {
|
||||
BaseHealthChecker<C, T> healthChecker = (BaseHealthChecker<C, T>) createHealthChecker(config, target);
|
||||
log.info("Initializing {}", healthChecker.getClass().getSimpleName());
|
||||
healthChecker.initialize(tbClient);
|
||||
devices.add(target.getDeviceId());
|
||||
healthCheckers.add(healthChecker);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public final void runChecks() {
|
||||
if (healthCheckers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
log.info("Starting {}", getName());
|
||||
stopWatch.start();
|
||||
String accessToken = tbClient.logIn();
|
||||
reporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime());
|
||||
|
||||
try (WsClient wsClient = wsClientFactory.createClient(accessToken)) {
|
||||
wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply();
|
||||
|
||||
for (BaseHealthChecker<C, T> healthChecker : healthCheckers) {
|
||||
healthChecker.check(wsClient);
|
||||
}
|
||||
}
|
||||
reporter.reportLatencies(tbClient);
|
||||
log.debug("Finished {}", getName());
|
||||
} catch (Throwable error) {
|
||||
try {
|
||||
reporter.serviceFailure(MonitoredServiceKey.GENERAL, error);
|
||||
} catch (Throwable reportError) {
|
||||
log.error("Error occurred during service failure reporting", reportError);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract BaseHealthChecker<?, ?> createHealthChecker(C config, T target);
|
||||
|
||||
protected abstract String getName();
|
||||
|
||||
}
|
||||
@ -52,8 +52,8 @@ public class MonitoringReporter {
|
||||
|
||||
@Value("${monitoring.failures_threshold}")
|
||||
private int failuresThreshold;
|
||||
@Value("${monitoring.send_repeated_failure_notification}")
|
||||
private boolean sendRepeatedFailureNotification;
|
||||
@Value("${monitoring.repeated_failure_notification}")
|
||||
private int repeatedFailureNotification;
|
||||
|
||||
@Value("${monitoring.latency.enabled}")
|
||||
private boolean latencyReportingEnabled;
|
||||
@ -75,7 +75,7 @@ public class MonitoringReporter {
|
||||
return;
|
||||
}
|
||||
log.info("Latencies:\n{}", latencies.stream().map(latency -> latency.getKey() + ": " + latency.getAvg() + " ms")
|
||||
.collect(Collectors.joining("\n")));
|
||||
.collect(Collectors.joining("\n")) + "\n");
|
||||
|
||||
if (!latencyReportingEnabled) return;
|
||||
|
||||
@ -86,7 +86,7 @@ public class MonitoringReporter {
|
||||
|
||||
try {
|
||||
if (StringUtils.isBlank(reportingAssetId)) {
|
||||
String assetName = "Monitoring";
|
||||
String assetName = "[Monitoring] Latencies";
|
||||
Asset monitoringAsset = tbClient.findAsset(assetName).orElseGet(() -> {
|
||||
Asset asset = new Asset();
|
||||
asset.setType("Monitoring");
|
||||
@ -122,7 +122,7 @@ public class MonitoringReporter {
|
||||
int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet();
|
||||
ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount);
|
||||
log.error(notification.getText());
|
||||
if (failuresCount == failuresThreshold || (sendRepeatedFailureNotification && failuresCount % failuresThreshold == 0)) {
|
||||
if (failuresCount == failuresThreshold || (repeatedFailureNotification != 0 && failuresCount % repeatedFailureNotification == 0)) {
|
||||
notificationService.sendNotification(notification);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,139 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.service.transport;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.TextNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.monitoring.client.TbClient;
|
||||
import org.thingsboard.monitoring.config.transport.DeviceConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportInfo;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
|
||||
import org.thingsboard.monitoring.config.transport.TransportType;
|
||||
import org.thingsboard.monitoring.service.BaseHealthChecker;
|
||||
import org.thingsboard.monitoring.util.ResourceUtils;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.TbResource;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecBootstrapClientCredential;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredential;
|
||||
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.DeviceData;
|
||||
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
|
||||
@Slf4j
|
||||
public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> {
|
||||
|
||||
private static final String DEFAULT_DEVICE_NAME = "[Monitoring] %s transport (%s)";
|
||||
private static final String DEFAULT_PROFILE_NAME = "[Monitoring] %s";
|
||||
|
||||
public TransportHealthChecker(C config, TransportMonitoringTarget target) {
|
||||
super(config, target);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initialize(TbClient tbClient) {
|
||||
String deviceName = String.format(DEFAULT_DEVICE_NAME, config.getTransportType(), target.getBaseUrl());
|
||||
Device device = tbClient.getTenantDevice(deviceName)
|
||||
.orElseGet(() -> {
|
||||
log.info("Creating new device '{}'", deviceName);
|
||||
return createDevice(config.getTransportType(), deviceName, tbClient);
|
||||
});
|
||||
DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId())
|
||||
.orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId()));
|
||||
|
||||
DeviceConfig deviceConfig = new DeviceConfig();
|
||||
deviceConfig.setId(device.getId().toString());
|
||||
deviceConfig.setName(deviceName);
|
||||
deviceConfig.setCredentials(credentials);
|
||||
target.setDevice(deviceConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createTestPayload(String testValue) {
|
||||
return JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object getInfo() {
|
||||
return new TransportInfo(getTransportType(), target.getBaseUrl());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getKey() {
|
||||
return getTransportType().name().toLowerCase() + "Transport";
|
||||
}
|
||||
|
||||
protected abstract TransportType getTransportType();
|
||||
|
||||
|
||||
private Device createDevice(TransportType transportType, String name, TbClient tbClient) {
|
||||
Device device = new Device();
|
||||
device.setName(name);
|
||||
|
||||
DeviceCredentials credentials = new DeviceCredentials();
|
||||
credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20));
|
||||
|
||||
DeviceData deviceData = new DeviceData();
|
||||
deviceData.setConfiguration(new DefaultDeviceConfiguration());
|
||||
if (transportType != TransportType.LWM2M) {
|
||||
device.setType("default");
|
||||
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
|
||||
credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
|
||||
} else {
|
||||
tbClient.getResources(new PageLink(1, 0, "lwm2m monitoring")).getData()
|
||||
.stream().findFirst()
|
||||
.orElseGet(() -> {
|
||||
TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class);
|
||||
log.info("Creating LwM2M resource");
|
||||
return tbClient.saveResource(newResource);
|
||||
});
|
||||
String profileName = String.format(DEFAULT_PROFILE_NAME, transportType);
|
||||
DeviceProfile profile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData()
|
||||
.stream().findFirst()
|
||||
.orElseGet(() -> {
|
||||
DeviceProfile newProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class);
|
||||
newProfile.setName(profileName);
|
||||
log.info("Creating LwM2M device profile");
|
||||
return tbClient.saveDeviceProfile(newProfile);
|
||||
});
|
||||
device.setType(profileName);
|
||||
device.setDeviceProfileId(profile.getId());
|
||||
deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration());
|
||||
|
||||
credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
|
||||
LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials();
|
||||
NoSecClientCredential client = new NoSecClientCredential();
|
||||
client.setEndpoint(credentials.getCredentialsId());
|
||||
lwm2mCreds.setClient(client);
|
||||
LwM2MBootstrapClientCredentials bootstrap = new LwM2MBootstrapClientCredentials();
|
||||
bootstrap.setBootstrapServer(new NoSecBootstrapClientCredential());
|
||||
bootstrap.setLwm2mServer(new NoSecBootstrapClientCredential());
|
||||
lwm2mCreds.setBootstrap(bootstrap);
|
||||
credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds));
|
||||
}
|
||||
return tbClient.saveDeviceWithCredentials(device, credentials).get();
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.service.transport;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
|
||||
import org.thingsboard.monitoring.service.BaseHealthChecker;
|
||||
import org.thingsboard.monitoring.service.BaseMonitoringService;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public final class TransportsMonitoringService extends BaseMonitoringService<TransportMonitoringConfig, TransportMonitoringTarget> {
|
||||
|
||||
@Override
|
||||
protected BaseHealthChecker<?, ?> createHealthChecker(TransportMonitoringConfig config, TransportMonitoringTarget target) {
|
||||
return applicationContext.getBean(config.getTransportType().getServiceClass(), config, target);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getName() {
|
||||
return "transports check";
|
||||
}
|
||||
|
||||
}
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.transport.impl;
|
||||
package org.thingsboard.monitoring.service.transport.impl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapClient;
|
||||
@ -23,10 +23,10 @@ import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.service.CoapTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.config.transport.CoapTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
|
||||
import org.thingsboard.monitoring.config.transport.TransportType;
|
||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@ -37,7 +37,7 @@ public class CoapTransportHealthChecker extends TransportHealthChecker<CoapTrans
|
||||
|
||||
private CoapClient coapClient;
|
||||
|
||||
protected CoapTransportHealthChecker(CoapTransportMonitoringConfig config, MonitoringTargetConfig target) {
|
||||
protected CoapTransportHealthChecker(CoapTransportMonitoringConfig config, TransportMonitoringTarget target) {
|
||||
super(config, target);
|
||||
}
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.transport.impl;
|
||||
package org.thingsboard.monitoring.service.transport.impl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
@ -21,10 +21,10 @@ import org.springframework.boot.web.client.RestTemplateBuilder;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.service.HttpTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.config.transport.HttpTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
|
||||
import org.thingsboard.monitoring.config.transport.TransportType;
|
||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
@ -35,7 +35,7 @@ public class HttpTransportHealthChecker extends TransportHealthChecker<HttpTrans
|
||||
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, MonitoringTargetConfig target) {
|
||||
protected HttpTransportHealthChecker(HttpTransportMonitoringConfig config, TransportMonitoringTarget target) {
|
||||
super(config, target);
|
||||
}
|
||||
|
||||
@ -13,17 +13,17 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.transport.impl;
|
||||
package org.thingsboard.monitoring.service.transport.impl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.monitoring.client.Lwm2mClient;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.service.Lwm2mTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.config.transport.Lwm2mTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
|
||||
import org.thingsboard.monitoring.config.transport.TransportType;
|
||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
|
||||
|
||||
@Service
|
||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
@ -32,7 +32,7 @@ public class Lwm2mTransportHealthChecker extends TransportHealthChecker<Lwm2mTra
|
||||
|
||||
private Lwm2mClient lwm2mClient;
|
||||
|
||||
protected Lwm2mTransportHealthChecker(Lwm2mTransportMonitoringConfig config, MonitoringTargetConfig target) {
|
||||
protected Lwm2mTransportHealthChecker(Lwm2mTransportMonitoringConfig config, TransportMonitoringTarget target) {
|
||||
super(config, target);
|
||||
}
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.transport.impl;
|
||||
package org.thingsboard.monitoring.service.transport.impl;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.IMqttToken;
|
||||
@ -25,10 +25,10 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.context.annotation.Scope;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.service.MqttTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.transport.TransportHealthChecker;
|
||||
import org.thingsboard.monitoring.config.transport.MqttTransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
|
||||
import org.thingsboard.monitoring.config.transport.TransportType;
|
||||
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
|
||||
|
||||
@Component
|
||||
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
||||
@ -39,7 +39,7 @@ public class MqttTransportHealthChecker extends TransportHealthChecker<MqttTrans
|
||||
|
||||
private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry";
|
||||
|
||||
protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, MonitoringTargetConfig target) {
|
||||
protected MqttTransportHealthChecker(MqttTransportMonitoringConfig config, TransportMonitoringTarget target) {
|
||||
super(config, target);
|
||||
}
|
||||
|
||||
@ -1,198 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.monitoring.transport;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.monitoring.client.TbClient;
|
||||
import org.thingsboard.monitoring.client.WsClient;
|
||||
import org.thingsboard.monitoring.client.WsClientFactory;
|
||||
import org.thingsboard.monitoring.config.DeviceConfig;
|
||||
import org.thingsboard.monitoring.config.MonitoringTargetConfig;
|
||||
import org.thingsboard.monitoring.config.TransportType;
|
||||
import org.thingsboard.monitoring.config.service.TransportMonitoringConfig;
|
||||
import org.thingsboard.monitoring.data.Latencies;
|
||||
import org.thingsboard.monitoring.data.MonitoredServiceKey;
|
||||
import org.thingsboard.monitoring.service.MonitoringReporter;
|
||||
import org.thingsboard.monitoring.util.ResourceUtils;
|
||||
import org.thingsboard.monitoring.util.TbStopWatch;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.TbResource;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecBootstrapClientCredential;
|
||||
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredential;
|
||||
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.device.data.DeviceData;
|
||||
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public final class TransportMonitoringService {
|
||||
|
||||
private final List<TransportMonitoringConfig> configs;
|
||||
private final List<TransportHealthChecker<?>> transportHealthCheckers = new LinkedList<>();
|
||||
private final List<UUID> devices = new LinkedList<>();
|
||||
|
||||
private final TbClient tbClient;
|
||||
private final WsClientFactory wsClientFactory;
|
||||
private final TbStopWatch stopWatch;
|
||||
private final MonitoringReporter reporter;
|
||||
private final ApplicationContext applicationContext;
|
||||
private ScheduledExecutorService scheduler;
|
||||
@Value("${monitoring.transports.monitoring_rate_ms}")
|
||||
private int monitoringRateMs;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
configs.forEach(config -> {
|
||||
config.getTargets().stream()
|
||||
.filter(target -> StringUtils.isNotBlank(target.getBaseUrl()))
|
||||
.peek(target -> checkMonitoringTarget(config, target, tbClient))
|
||||
.forEach(target -> {
|
||||
TransportHealthChecker<?> transportHealthChecker = applicationContext.getBean(config.getTransportType().getServiceClass(), config, target);
|
||||
transportHealthCheckers.add(transportHealthChecker);
|
||||
devices.add(target.getDevice().getId());
|
||||
});
|
||||
});
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("monitoring-executor"));
|
||||
}
|
||||
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void startMonitoring() {
|
||||
scheduler.scheduleWithFixedDelay(() -> {
|
||||
try {
|
||||
log.debug("Starting transports check");
|
||||
stopWatch.start();
|
||||
String accessToken = tbClient.logIn();
|
||||
reporter.reportLatency(Latencies.LOG_IN, stopWatch.getTime());
|
||||
|
||||
try (WsClient wsClient = wsClientFactory.createClient(accessToken)) {
|
||||
wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply();
|
||||
|
||||
for (TransportHealthChecker<?> transportHealthChecker : transportHealthCheckers) {
|
||||
transportHealthChecker.check(wsClient);
|
||||
}
|
||||
}
|
||||
reporter.reportLatencies(tbClient);
|
||||
log.debug("Finished transports check");
|
||||
} catch (Throwable error) {
|
||||
try {
|
||||
reporter.serviceFailure(MonitoredServiceKey.GENERAL, error);
|
||||
} catch (Throwable reportError) {
|
||||
log.error("Error occurred during service failure reporting", reportError);
|
||||
}
|
||||
}
|
||||
}, 0, monitoringRateMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private void checkMonitoringTarget(TransportMonitoringConfig config, MonitoringTargetConfig target, TbClient tbClient) {
|
||||
DeviceConfig deviceConfig = target.getDevice();
|
||||
tbClient.logIn();
|
||||
|
||||
DeviceId deviceId;
|
||||
if (deviceConfig == null || deviceConfig.getId() == null) {
|
||||
String deviceName = String.format("[%s] Monitoring device (%s)", config.getTransportType(), target.getBaseUrl());
|
||||
Device device = tbClient.getTenantDevice(deviceName)
|
||||
.orElseGet(() -> {
|
||||
log.info("Creating new device '{}'", deviceName);
|
||||
return createDevice(config.getTransportType(), deviceName, tbClient);
|
||||
});
|
||||
deviceId = device.getId();
|
||||
target.getDevice().setId(deviceId.toString());
|
||||
} else {
|
||||
deviceId = new DeviceId(deviceConfig.getId());
|
||||
}
|
||||
|
||||
log.info("Using device {} for {} monitoring", deviceId, config.getTransportType());
|
||||
DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(deviceId)
|
||||
.orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + deviceId));
|
||||
target.getDevice().setCredentials(credentials);
|
||||
}
|
||||
|
||||
private Device createDevice(TransportType transportType, String name, TbClient tbClient) {
|
||||
Device device = new Device();
|
||||
device.setName(name);
|
||||
|
||||
DeviceCredentials credentials = new DeviceCredentials();
|
||||
credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20));
|
||||
|
||||
DeviceData deviceData = new DeviceData();
|
||||
deviceData.setConfiguration(new DefaultDeviceConfiguration());
|
||||
if (transportType != TransportType.LWM2M) {
|
||||
device.setType("default");
|
||||
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
|
||||
credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
|
||||
} else {
|
||||
tbClient.getResources(new PageLink(1, 0, "lwm2m monitoring")).getData()
|
||||
.stream().findFirst()
|
||||
.orElseGet(() -> {
|
||||
TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class);
|
||||
log.info("Creating LwM2M resource");
|
||||
return tbClient.saveResource(newResource);
|
||||
});
|
||||
String profileName = "LwM2M Monitoring";
|
||||
DeviceProfile profile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData()
|
||||
.stream().findFirst()
|
||||
.orElseGet(() -> {
|
||||
DeviceProfile newProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class);
|
||||
newProfile.setName(profileName);
|
||||
log.info("Creating LwM2M device profile");
|
||||
return tbClient.saveDeviceProfile(newProfile);
|
||||
});
|
||||
device.setType(profileName);
|
||||
device.setDeviceProfileId(profile.getId());
|
||||
deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration());
|
||||
|
||||
credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
|
||||
LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials();
|
||||
NoSecClientCredential client = new NoSecClientCredential();
|
||||
client.setEndpoint(credentials.getCredentialsId());
|
||||
lwm2mCreds.setClient(client);
|
||||
LwM2MBootstrapClientCredentials bootstrap = new LwM2MBootstrapClientCredentials();
|
||||
bootstrap.setBootstrapServer(new NoSecBootstrapClientCredential());
|
||||
bootstrap.setLwm2mServer(new NoSecBootstrapClientCredential());
|
||||
lwm2mCreds.setBootstrap(bootstrap);
|
||||
credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds));
|
||||
}
|
||||
return tbClient.saveDeviceWithCredentials(device, credentials).get();
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,5 +1,4 @@
|
||||
{
|
||||
"name": "LwM2M Monitoring",
|
||||
"type": "DEFAULT",
|
||||
"image": null,
|
||||
"defaultQueueName": null,
|
||||
|
||||
@ -32,79 +32,62 @@ monitoring:
|
||||
# WebSocket request timeout
|
||||
request_timeout_ms: '${WS_REQUEST_TIMEOUT_MS:3000}'
|
||||
|
||||
# Checks frequency in milliseconds
|
||||
monitoring_rate_ms: '${MONITORING_RATE_MS:10000}'
|
||||
# Maximum time between request to transport and WebSocket update
|
||||
check_timeout_ms: '${CHECK_TIMEOUT_MS:5000}'
|
||||
|
||||
# Failures threshold for notifying
|
||||
failures_threshold: '${FAILURES_THRESHOLD:2}'
|
||||
# Whether to notify about next failures after first notification (will notify after each FAILURES_THRESHOLD failures)
|
||||
send_repeated_failure_notification: '${SEND_REPEATED_FAILURE_NOTIFICATION:true}'
|
||||
# Notify after each REPEATED_FAILURE_NOTIFICATION subsequent failures, 0 to notify only once on first failure
|
||||
repeated_failure_notification: '${REPEATED_FAILURE_NOTIFICATION:4}'
|
||||
|
||||
transports:
|
||||
# Transports check frequency in milliseconds
|
||||
monitoring_rate_ms: '${TRANSPORTS_MONITORING_RATE_MS:10000}'
|
||||
|
||||
mqtt:
|
||||
# Enable MQTT checks
|
||||
# Enable MQTT transport checks
|
||||
enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}'
|
||||
# MQTT request timeout in milliseconds
|
||||
request_timeout_ms: '${MQTT_REQUEST_TIMEOUT_MS:4000}'
|
||||
# MQTT QoS
|
||||
qos: '${MQTT_QOS_LEVEL:1}'
|
||||
targets:
|
||||
# MQTT base url, tcp://DOMAIN:1883 by default
|
||||
# MQTT transport base url, tcp://DOMAIN:1883 by default
|
||||
- base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}'
|
||||
device:
|
||||
# MQTT device to push telemetry for. If not set - device will be found or created automatically
|
||||
id: '${MQTT_TRANSPORT_TARGET_DEVICE_ID:}'
|
||||
# To add more targets, use following environment variables:
|
||||
# monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[1].device.id,
|
||||
# monitoring.transports.mqtt.targets[2].base_url, monitoring.transports.mqtt.targets[2].device.id, etc.
|
||||
# monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc.
|
||||
|
||||
coap:
|
||||
# Enable CoAP checks
|
||||
# Enable CoAP transport checks
|
||||
enabled: '${COAP_TRANSPORT_MONITORING_ENABLED:true}'
|
||||
# CoAP request timeout in milliseconds
|
||||
request_timeout_ms: '${COAP_REQUEST_TIMEOUT_MS:4000}'
|
||||
targets:
|
||||
# CoAP base url, coap://DOMAIN by default
|
||||
# CoAP transport base url, coap://DOMAIN by default
|
||||
- base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}'
|
||||
# CoAP device to push telemetry for. If not set - device will be found or created automatically
|
||||
device:
|
||||
id: '${COAP_TRANSPORT_TARGET_DEVICE_ID:}'
|
||||
# To add more targets, use following environment variables:
|
||||
# monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[1].device.id,
|
||||
# monitoring.transports.coap.targets[2].base_url, monitoring.transports.coap.targets[2].device.id, etc.
|
||||
# monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc.
|
||||
|
||||
http:
|
||||
# Enable HTTP checks
|
||||
# Enable HTTP transport checks
|
||||
enabled: '${HTTP_TRANSPORT_MONITORING_ENABLED:true}'
|
||||
# HTTP request timeout in milliseconds
|
||||
request_timeout_ms: '${HTTP_REQUEST_TIMEOUT_MS:4000}'
|
||||
targets:
|
||||
# HTTP base url, https://DOMAIN by default
|
||||
- base_url: '${HTTP_TRANSPORT_BASE_URL:https://${monitoring.domain}}'
|
||||
device:
|
||||
# HTTP device to push telemetry for. If not set - device will be found or created automatically
|
||||
id: '${HTTP_TRANSPORT_TARGET_DEVICE_ID:}'
|
||||
# HTTP transport base url, http://DOMAIN by default
|
||||
- base_url: '${HTTP_TRANSPORT_BASE_URL:http://${monitoring.domain}}'
|
||||
# To add more targets, use following environment variables:
|
||||
# monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[1].device.id,
|
||||
# monitoring.transports.http.targets[2].base_url, monitoring.transports.http.targets[2].device.id, etc.
|
||||
# monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc.
|
||||
|
||||
lwm2m:
|
||||
# Enable LwM2M checks
|
||||
# Enable LwM2M transport checks
|
||||
enabled: '${LWM2M_TRANSPORT_MONITORING_ENABLED:true}'
|
||||
# LwM2M request timeout in milliseconds
|
||||
request_timeout_ms: '${LWM2M_REQUEST_TIMEOUT_MS:4000}'
|
||||
targets:
|
||||
# LwM2M base url, coap://DOMAIN:5685 by default
|
||||
# LwM2M transport base url, coap://DOMAIN:5685 by default
|
||||
- base_url: '${LWM2M_TRANSPORT_BASE_URL:coap://${monitoring.domain}:5685}'
|
||||
# LwM2M device to push telemetry for. If not set - device will be found or created automatically
|
||||
device:
|
||||
id: '${LWM2M_TRANSPORT_TARGET_DEVICE_ID:}'
|
||||
# To add more targets, use following environment variables:
|
||||
# monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[1].device.id,
|
||||
# monitoring.transports.lwm2m.targets[2].base_url, monitoring.transports.lwm2m.targets[2].device.id, etc.
|
||||
# monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc.
|
||||
|
||||
notification_channels:
|
||||
slack:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user