diff --git a/monitoring/pom.xml b/monitoring/pom.xml new file mode 100644 index 0000000000..59fdd5f1df --- /dev/null +++ b/monitoring/pom.xml @@ -0,0 +1,132 @@ + + + + 4.0.0 + + org.thingsboard + 3.4.2-SNAPSHOT + thingsboard + + monitoring + 3.4.2-SNAPSHOT + Monitoring service + jar + + + UTF-8 + ${basedir}/.. + monitoring-service + + + + + org.thingsboard.common + data + + + org.thingsboard.common + util + + + org.thingsboard + rest-client + provided + + + org.springframework.boot + spring-boot-starter-web + + + + org.eclipse.californium + californium-core + 2.6.1 + + + org.eclipse.californium + scandium + 2.6.1 + + + org.eclipse.leshan + leshan-client-cf + 2.0.0-M4 + + + org.eclipse.leshan + leshan-core + 2.0.0-M4 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + org.apache.httpcomponents + httpclient + + + org.java-websocket + Java-WebSocket + compile + + + org.apache.commons + commons-lang3 + + + org.slf4j + slf4j-api + + + org.slf4j + log4j-over-slf4j + + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + + + + + ${pkg.name}-${project.version} + + + ${project.basedir}/src/main/resources + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + diff --git a/monitoring/src/main/java/org/thingsboard/aba/LatencyMsg.java b/monitoring/src/main/java/org/thingsboard/aba/LatencyMsg.java new file mode 100644 index 0000000000..8c49ee8b27 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/LatencyMsg.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.aba; + +import lombok.Data; + +@Data +public class LatencyMsg { + + private long restClientLoginLatency; + private long deviceCredLatency; + private long getDeviceLatency; + + private long wsSubInitLatency; + + private long mqttConnectLatency; + private long mqttSendLatency; + private long mqttTotalLatency; + + private long httpSendLatency; + private long httpTotalLatency; + + private long mqttErrors; + private long mqttReconnects; + + + public boolean hasLongLatency(long threshold) { + return restClientLoginLatency > threshold + || deviceCredLatency > threshold + || getDeviceLatency > threshold + || wsSubInitLatency > threshold + || mqttConnectLatency > threshold + || mqttSendLatency > threshold + || mqttTotalLatency > threshold + || httpSendLatency > threshold + || httpTotalLatency > threshold; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/aba/SaasApi.java b/monitoring/src/main/java/org/thingsboard/aba/SaasApi.java new file mode 100644 index 0000000000..17d196a5aa --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/SaasApi.java @@ -0,0 +1,234 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.aba; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.http.ssl.TrustStrategy; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpRequest; +import org.springframework.http.client.support.HttpRequestWrapper; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.rest.client.RestClient; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +@Component +@Slf4j +public class SaasApi { + + private static final long wsResponseWaitSec = 300; + + + @Value("${saas.host}") + private String host; + + private AtomicLong counter = new AtomicLong(); + private SampleMqttClient mqttClient; + private AtomicLong mqttErrors = new AtomicLong(); + private AtomicLong mqttReconnects = new AtomicLong(); + + public void checkMqtt(Device device, DeviceCredentials deviceCredentials, RestClient restClient, LatencyMsg latency) { + try { + WsClient wsClient = subscribeToWebSocket(device.getId(), "tsSubCmds", restClient, latency); + + long msgTs = System.currentTimeMillis(); + long submittedValue = counter.incrementAndGet(); + sendMqtt(deviceCredentials, submittedValue, msgTs, latency); + + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(wsResponseWaitSec); + if(actualLatestTelemetry == null) { + latency.setMqttTotalLatency(-1*wsResponseWaitSec); + } else { + validateWsResponse(actualLatestTelemetry, msgTs, submittedValue); + long responseReadyTs = System.currentTimeMillis(); + latency.setMqttTotalLatency(responseReadyTs - msgTs); + + } + wsClient.closeBlocking(); + latency.setMqttErrors(mqttErrors.getAndSet(0L)); + latency.setMqttReconnects(mqttReconnects.getAndSet(0L)); + } catch (Exception ex) { + throw new IllegalStateException("Could not check mqtt: " + ex.getMessage(), ex); + } + } + + public void checkHttp(Device device, DeviceCredentials deviceCredentials, RestClient restClient, LatencyMsg latency) { + try { + WsClient wsClient = subscribeToWebSocket(device.getId(), "tsSubCmds", restClient, latency); + long msgTs = System.currentTimeMillis(); + long submittedValue = counter.incrementAndGet(); + sendHttp(restClient, deviceCredentials, submittedValue, msgTs, latency); + WsTelemetryResponse actualLatestTelemetry = wsClient.getLastMessage(wsResponseWaitSec); + if(actualLatestTelemetry == null) { + latency.setHttpTotalLatency(-1*wsResponseWaitSec); + } else { + validateWsResponse(actualLatestTelemetry, msgTs, submittedValue); + long responseReadyTs = System.currentTimeMillis(); + latency.setHttpTotalLatency(responseReadyTs - msgTs); + } + wsClient.closeBlocking(); + } catch (Exception ex) { + throw new IllegalStateException("Could not check http: " + ex.getMessage(), ex); + } + } + + + private void validateWsResponse(WsTelemetryResponse response, long expectedTs, long expectedVal) { + try { + List values = response.getDataValuesByKey("checkKey"); + if (CollectionUtils.isEmpty(values)) { + throw new IllegalStateException("Ws response - no data"); + } + long actualTs = Long.parseLong(values.get(0).toString()); + long actualVal = Long.parseLong(values.get(1).toString()); + + if (actualTs != expectedTs) { + throw new IllegalStateException("Ws response - Ts not matched. Actual: " + actualTs + " Expected: " + expectedTs + " Delta: " + (expectedTs - actualTs)); + } + + if (actualVal != expectedVal) { + throw new IllegalStateException("Ws response - Value not matched. Actual: " + actualVal + " Expected: " + expectedVal + " Delta: " + (expectedVal - actualVal)); + } + } catch (Exception ex) { + throw new IllegalStateException("Could not validate WS response: " + response, ex); + } + } + + private void sendHttp(RestClient restClient, DeviceCredentials deviceCredentials, long value, long msgTs, LatencyMsg latency) { + try { + long start = System.currentTimeMillis(); + + RestTemplate restTemplate = new RestTemplate(); + restTemplate.setInterceptors(Collections.singletonList((httpRequest, bytes, clientHttpRequestExecution) -> { + HttpRequest wrapper = new HttpRequestWrapper(httpRequest); + wrapper.getHeaders().set("X-Authorization", "Bearer " + restClient.getToken()); + return clientHttpRequestExecution.execute(wrapper, bytes); + })); + + String payload = createPayload(msgTs, value).toString(); + + restTemplate.postForEntity("https://" + host + "/api/v1/" + deviceCredentials.getCredentialsId() + "/telemetry", payload, String.class); + + latency.setHttpSendLatency(System.currentTimeMillis() - start); + log.info("HTTP msg submitted"); + } catch (Exception ex) { + throw new IllegalStateException("Could not send http: " + ex.getMessage(), ex); + } + } + + private void sendMqtt(DeviceCredentials deviceCredentials, long value, long msgTs, LatencyMsg latency) { + try { + long start = System.currentTimeMillis(); + SampleMqttClient mqttClient = getMqttClient(deviceCredentials, latency); + + JsonObject payload = createPayload(msgTs, value); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.readTree(payload.toString()); + mqttClient.publishTelemetry(objectMapper.readTree(payload.toString())); + latency.setMqttSendLatency(System.currentTimeMillis() - start); + log.info("Mqtt msg submitted"); + } catch (Exception ex) { + throw new IllegalStateException("Could not send mqtt: " + ex.getMessage(), ex); + } + } + + private WsClient subscribeToWebSocket(DeviceId deviceId, String property, RestClient restClient, LatencyMsg latency) { + try { + long start = System.currentTimeMillis(); + WsClient wsClient = new WsClient(new URI("wss://" + host + "/api/ws/plugins/telemetry?token=" + restClient.getToken())); + SSLContextBuilder builder = SSLContexts.custom(); + builder.loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true); + wsClient.setSocketFactory(builder.build().getSocketFactory()); + wsClient.connectBlocking(); + + JsonObject cmdsObject = new JsonObject(); + cmdsObject.addProperty("entityType", EntityType.DEVICE.name()); + cmdsObject.addProperty("entityId", deviceId.toString()); + cmdsObject.addProperty("scope", "LATEST_TELEMETRY"); + cmdsObject.addProperty("cmdId", new Random().nextInt(100)); + + JsonArray cmd = new JsonArray(); + cmd.add(cmdsObject); + JsonObject wsRequest = new JsonObject(); + wsRequest.add(property, cmd); + wsClient.send(wsRequest.toString()); + wsClient.waitForFirstReply(); + latency.setWsSubInitLatency(System.currentTimeMillis() - start); + log.info("Ws subscription created"); + return wsClient; + } catch (Exception ex) { + throw new IllegalStateException("Could not subscribe to WS: " + ex.getMessage(), ex); + } + } + + private SampleMqttClient getMqttClient(DeviceCredentials deviceCredentials, LatencyMsg latency) { + try { + long start = System.currentTimeMillis(); + if (mqttClient == null || !mqttClient.nativeClient.isConnected()) { + if (mqttClient != null) { + mqttReconnects.incrementAndGet(); + try { + mqttClient.disconnect(); + } catch (Exception ex) { + log.error("fail disconnect mqtt", ex); + } + } + String uri = "tcp://" + host + ":1883"; + String deviceTmpName = "health check device"; + String token = deviceCredentials.getCredentialsId(); + mqttClient = new SampleMqttClient(uri, deviceTmpName, token, mqttErrors); + boolean connected = mqttClient.connect(); + if (!connected) { + throw new IllegalStateException("Could not connect mqtt nativeClient"); + } + } + latency.setMqttConnectLatency(System.currentTimeMillis() - start); + return mqttClient; + } catch (Exception ex) { + throw new IllegalStateException("Could not create mqtt nativeClient: " + ex.getMessage(), ex); + } + } + + private static JsonObject createPayload(long ts, long val) { + JsonObject values = createPayload(val); + JsonObject payload = new JsonObject(); + payload.addProperty("ts", ts); + payload.add("values", values); + return payload; + } + + private static JsonObject createPayload(long val) { + JsonObject values = new JsonObject(); + values.addProperty("checkKey", val); + + return values; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/aba/SaasHealthChecker.java b/monitoring/src/main/java/org/thingsboard/aba/SaasHealthChecker.java new file mode 100644 index 0000000000..39f249aa4a --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/SaasHealthChecker.java @@ -0,0 +1,233 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.aba; + +import com.google.common.collect.Lists; +import com.google.gson.JsonObject; +import lombok.extern.slf4j.Slf4j; +import org.crawler.license.service.NotifyService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpRequest; +import org.springframework.http.client.support.HttpRequestWrapper; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.rest.client.RestClient; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.security.DeviceCredentials; + +import javax.annotation.PostConstruct; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +@Component +@Slf4j +public class SaasHealthChecker { + + // private String url = ""; + private String username = "paromskiy@gmail.com"; + private String pass = "123test123"; + private String deviceName = "healthcheckDevice"; + private String dashboardId = ""; + + @Value("${saas.host}") + private String host; + + @Value("${saas.check.interval.sec}") + private long refreshIntervalSec; + + @Value("${saas.notifyThreshold.ms}") + private long notifyThreshold; + + @Autowired + private NotifyService notifyService; + + @Autowired + private SaasApi saasApi; + + private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private AtomicLong counter = new AtomicLong(); + private long logStatsIntervalMs = TimeUnit.HOURS.toMillis(12); + private long lastLogStatsTime = System.currentTimeMillis(); + private long lastLatencyNotification = 0; + private boolean inFailedState = false; + + private List latestLatencies = Lists.newArrayList(); + private long failsCount = 0; + + @PostConstruct + public void init() throws URISyntaxException { + check(); + executor.scheduleWithFixedDelay(() -> { + try { + log.info("Start saas healthcheck"); + check(); + counter.incrementAndGet(); + log.info("Finish saas healthcheck"); +// if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) == 1) { +// notifyService.sendInSlackSaasHealth("I AM ALIVE. Checks performed: " + counter.get()); +// } + if(System.currentTimeMillis() - lastLogStatsTime > logStatsIntervalMs) { + lastLogStatsTime = System.currentTimeMillis(); + logIntervalStats(); + notifyService.sendInSlackSaasHealth("I AM ALIVE. Checks performed: " + counter.get()); + } + } catch (Exception ex) { + log.error("Scheduled error", ex); + } + + }, refreshIntervalSec, refreshIntervalSec, TimeUnit.SECONDS); + } + + private void check() throws URISyntaxException { + try { + // load dashboard + LatencyMsg latency = new LatencyMsg(); + RestClient restClient = buildClient(latency); + Device device = getDevice(restClient, latency); + DeviceCredentials deviceCred = getDeviceCred(restClient, device, latency); + + + saasApi.checkMqtt(device, deviceCred, restClient, latency); + saasApi.checkHttp(device, deviceCred, restClient, latency); + + + processLatencyObj(latency, restClient, deviceCred); + latestLatencies.add(latency); + if (inFailedState) { + inFailedState = false; + notifyService.sendInSlackSaasHealth("service restored"); + } + } catch (Exception ex) { + failsCount++; + log.error("Error while check Saas health", ex); + if (!inFailedState || System.currentTimeMillis() - lastLatencyNotification > TimeUnit.MINUTES.toHours(15)) { + String msg = "########################## \n ERROR \n " + ex.getMessage(); + lastLatencyNotification = System.currentTimeMillis(); + inFailedState = true; + notifyService.sendInSlackSaasHealth(msg); + } + } + } + + private void processLatencyObj(LatencyMsg latency, RestClient restClient, DeviceCredentials credentials) throws URISyntaxException { + System.out.println(); + System.out.println(latency); + System.out.println(); + + + JsonObject values = new JsonObject(); + values.addProperty("restClientLoginLatency", latency.getRestClientLoginLatency()); + values.addProperty("deviceCredLatency", latency.getDeviceCredLatency()); + values.addProperty("getDeviceLatency", latency.getGetDeviceLatency()); + values.addProperty("wsSubInitLatency", latency.getWsSubInitLatency()); + values.addProperty("mqttConnectLatency", latency.getMqttConnectLatency()); + values.addProperty("mqttSendLatency", latency.getMqttSendLatency()); + values.addProperty("mqttTotalLatency", latency.getMqttTotalLatency()); + values.addProperty("httpSendLatency", latency.getHttpSendLatency()); + values.addProperty("httpTotalLatency", latency.getHttpTotalLatency()); + values.addProperty("mqttErrors", latency.getMqttErrors()); + values.addProperty("mqttReconnects", latency.getMqttReconnects()); + + JsonObject payload = new JsonObject(); + payload.addProperty("ts", System.currentTimeMillis()); + payload.add("values", values); + + RestTemplate restTemplate = new RestTemplate(); + restTemplate.setInterceptors(Collections.singletonList((httpRequest, bytes, clientHttpRequestExecution) -> { + HttpRequest wrapper = new HttpRequestWrapper(httpRequest); + wrapper.getHeaders().set("X-Authorization", "Bearer " + restClient.getToken()); + return clientHttpRequestExecution.execute(wrapper, bytes); + })); + + + restTemplate.postForEntity("https://" + host + "/api/v1/" + credentials.getCredentialsId() + "/telemetry", payload.toString(), String.class); + + if (latency.hasLongLatency(notifyThreshold) && System.currentTimeMillis() - lastLatencyNotification > TimeUnit.MINUTES.toHours(60)) { + lastLatencyNotification = System.currentTimeMillis(); + notifyService.sendInSlackSaasHealth("Some SaaS latencies are upper threshold [" + notifyThreshold + "ms] \n " + latency); + } + } + + private Device getDevice(RestClient restClient, LatencyMsg latencyMsg) { + long start = System.currentTimeMillis(); + Optional tenantDevice = restClient.getTenantDevice(deviceName); + Device device = tenantDevice.orElseThrow(() -> new IllegalStateException("Device [" + deviceName + "] was not found")); + latencyMsg.setGetDeviceLatency(System.currentTimeMillis() - start); + log.info("Device loaded"); + return device; + } + + private DeviceCredentials getDeviceCred(RestClient client, Device device, LatencyMsg latencyMsg) { + long start = System.currentTimeMillis(); + Optional credentialsOptional = client.getDeviceCredentialsByDeviceId(device.getId()); + DeviceCredentials credentials = credentialsOptional.orElseThrow(() -> new IllegalStateException("Could not load device credentials")); + latencyMsg.setDeviceCredLatency(System.currentTimeMillis() - start); + log.info("Device cred loaded"); + return credentials; + } + + private RestClient buildClient(LatencyMsg latencyMsg) { + try { + long start = System.currentTimeMillis(); + RestClient client = new RestClient("https://" + host); + client.login(username, pass); + latencyMsg.setRestClientLoginLatency(System.currentTimeMillis() - start); + log.info("Rest client ready"); + return client; + } catch (Exception ex) { + throw new IllegalStateException("Could not login: " + ex.getMessage(), ex); + } + } + + private void logIntervalStats() throws URISyntaxException { + List httpTotal = latestLatencies.stream().map(LatencyMsg::getHttpTotalLatency).collect(Collectors.toList()); + List login = latestLatencies.stream().map(LatencyMsg::getRestClientLoginLatency).collect(Collectors.toList()); + List getDevice = latestLatencies.stream().map(LatencyMsg::getDeviceCredLatency).collect(Collectors.toList()); + long mqttErrorsCnt = latestLatencies.stream().mapToLong(LatencyMsg::getMqttErrors).sum(); + long mqttReconectCnt = latestLatencies.stream().mapToLong(LatencyMsg::getMqttReconnects).sum(); + String msg = "[Interval stats] Fails count: " + failsCount + " ok count: " + latestLatencies.size() + "\n"; + if (latestLatencies.size() > 0) { + msg += "\t http send -receive dalay: " + toPercentile(httpTotal) + "\n"; + msg += "\t http login: " + toPercentile(login) + "\n"; + msg += "\t get device: " + toPercentile(getDevice) + "\n"; + msg += "\t mqtt errors: " + mqttErrorsCnt + "\n"; + msg += "\t mqtt reconnects: " + mqttReconectCnt + "\n"; + } + failsCount = 0; + latestLatencies.clear(); + + log.info("Interval stats: {}" , msg); + notifyService.sendInSlackSaasHealth(msg); + } + + private String toPercentile(List values) { + return "75%: " + percentile(values, 75) + "ms; \t90%:" + percentile(values, 90) + "ms; \t95%:" + percentile(values, 95) + "ms; \tmin: " + percentile(values, 1) + "ms; \tmax: " + percentile(values, 99.99); + } + + private static Long percentile(List values, double percentile) { + Collections.sort(values); + int index = (int) Math.ceil(percentile / 100.0 * values.size()); + return (long) ((Math.round(values.get(index - 1) * 100)) / 100d); + } +} diff --git a/monitoring/src/main/java/org/thingsboard/aba/SampleMqttClient.java b/monitoring/src/main/java/org/thingsboard/aba/SampleMqttClient.java new file mode 100644 index 0000000000..e0dbc398c1 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/SampleMqttClient.java @@ -0,0 +1,112 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.aba; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttActionListener; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +public class SampleMqttClient { + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + @Getter + private final String deviceToken; + @Getter + private final String deviceName; + @Getter + private final String clientId; + private final MqttClientPersistence persistence; + public final MqttAsyncClient nativeClient; + public final AtomicLong failCount; + + + + public SampleMqttClient(String uri, String deviceName, String deviceToken, AtomicLong failCount) throws Exception { + this.clientId = MqttAsyncClient.generateClientId(); + this.deviceToken = deviceToken; + this.deviceName = deviceName; + this.failCount = failCount; + this.persistence = new MemoryPersistence(); + this.nativeClient = new MqttAsyncClient(uri, clientId, persistence); + } + + public boolean connect() throws Exception { + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(deviceToken); + try { + nativeClient.connect(options, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken iMqttToken) { + log.info("[{}] connected to Thingsboard!", deviceName); + } + + @Override + public void onFailure(IMqttToken iMqttToken, Throwable e) { + failCount.incrementAndGet(); + log.error("[{}] failed to connect to Thingsboard!", deviceName, e); + } + }).waitForCompletion(); + } catch (MqttException e) { + log.error("Failed to connect to the server", e); + } + return nativeClient.isConnected(); + } + + public void disconnect() throws Exception { + nativeClient.disconnect().waitForCompletion(); + } + + public void publishAttributes(JsonNode data) throws Exception { + publish("v1/devices/me/attributes", data, true); + } + + public void publishTelemetry(JsonNode data) throws Exception { + publish("v1/devices/me/telemetry", data, false); + } + + private void publish(String topic, JsonNode data, boolean sync) throws Exception { + MqttMessage msg = new MqttMessage(MAPPER.writeValueAsString(data).getBytes(StandardCharsets.UTF_8)); + IMqttDeliveryToken deliveryToken = nativeClient.publish(topic, msg, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + log.trace("Data updated!"); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + log.error("[{}] Data update failed!", deviceName, exception); + } + }); + if (sync) { + deliveryToken.waitForCompletion(); + } + } +} diff --git a/monitoring/src/main/java/org/thingsboard/aba/WsClient.java b/monitoring/src/main/java/org/thingsboard/aba/WsClient.java new file mode 100644 index 0000000000..fdbedc2eee --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/WsClient.java @@ -0,0 +1,102 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.aba; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + +import javax.net.ssl.SSLParameters; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class WsClient extends WebSocketClient { + + private static final ObjectMapper mapper = new ObjectMapper(); + private WsTelemetryResponse message; + + private volatile boolean firstReplyReceived; + private CountDownLatch firstReply = new CountDownLatch(1); + private CountDownLatch latch = new CountDownLatch(1); + + WsClient(URI serverUri) { + super(serverUri); + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + } + + @Override + public void onMessage(String message) { + log.info("Ws on message {}", message); + if (!firstReplyReceived) { + firstReplyReceived = true; + firstReply.countDown(); + } else { + try { + WsTelemetryResponse response = mapper.readValue(message, WsTelemetryResponse.class); + response.setArriveTs(System.currentTimeMillis()); + if (!response.getData().isEmpty()) { + this.message = response; + latch.countDown(); + } + } catch (IOException e) { + log.error("ws message can't be read"); + } + } + } + + @Override + public void onClose(int code, String reason, boolean remote) { + log.info("ws is closed, due to [{}]", reason); + } + + @Override + public void onError(Exception ex) { + ex.printStackTrace(); + } + + + public WsTelemetryResponse getLastMessage(long timeoutSec) { + try { + latch.await(timeoutSec, TimeUnit.SECONDS); + log.info("Ws response received"); + return this.message; + } catch (InterruptedException e) { + log.error("Timeout, ws message wasn't received"); + } + return null; + } + + void waitForFirstReply() { + try { + firstReply.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Timeout, ws message wasn't received"); + throw new RuntimeException(e); + } + } + + @Override + protected void onSetSSLParameters(SSLParameters sslParameters) { + sslParameters.setEndpointIdentificationAlgorithm(null); + } +} diff --git a/monitoring/src/main/java/org/thingsboard/aba/WsTelemetryResponse.java b/monitoring/src/main/java/org/thingsboard/aba/WsTelemetryResponse.java new file mode 100644 index 0000000000..a122a8c2aa --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/WsTelemetryResponse.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.aba; + +import lombok.Data; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Data +public class WsTelemetryResponse { + + private int subscriptionId; + private int errorCode; + private String errorMsg; + private Map>> data; + private Map latestValues; + private long arriveTs; + + public List getDataValuesByKey(String key) { + return data.entrySet().stream() + .filter(e -> e.getKey().equals(key)) + .flatMap(e -> e.getValue().stream().flatMap(Collection::stream)) + .collect(Collectors.toList()); + } +} diff --git a/monitoring/src/main/java/org/thingsboard/aba/application.yml b/monitoring/src/main/java/org/thingsboard/aba/application.yml new file mode 100644 index 0000000000..0e8ccbf6cc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/aba/application.yml @@ -0,0 +1,28 @@ +# +# Copyright © 2016-2022 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server.address: "${HTTP_BIND_ADDRESS:0.0.0.0}" +server.port: "${HTTP_BIND_PORT:9712}" + +server.compression.enabled: true +server.compression.mime-types: "text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json" +server.compression.min-response-size: 1024 + + +saas.host: "${SAAS_CHECK_HOST:thingsboard.cloud}" +saas.check.interval.sec: "${SAAS_CHECK_INTERVAL_SEC:60}" +saas.slack.token: "${SAAS_SLACK_TOKEN:T2QD8CLUS/B02TP4E0Y02/RenftHjCFYdVlOU8OiPGOhJV}" +saas.notifyThreshold.ms: "${SAAS_NOTIFY_THRESHOLD_MS:28000}" diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java new file mode 100644 index 0000000000..b177614aba --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java @@ -0,0 +1,68 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.monitoring.config.service.TransportMonitoringServiceConfig; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@SpringBootApplication +@EnableScheduling +public class ThingsboardMonitoringApplication { + + public static void main(String[] args) { + new SpringApplicationBuilder(ThingsboardMonitoringApplication.class) + .properties(Map.of("spring.config.name", "tb-monitoring")) + .run(args); + } + + @Bean + public ApplicationRunner initMonitoringServices(List configs, ApplicationContext context) { + return args -> { + configs.forEach(config -> { + config.getTargets().stream() + .filter(target -> StringUtils.isNotBlank(target.getBaseUrl())) + .forEach(target -> { + context.getBean(config.getTransportType().getMonitoringServiceClass(), config, target); + }); + }); + }; + } + + @Bean + public ScheduledExecutorService monitoringExecutor(List configs) { + int targetsCount = configs.stream().mapToInt(config -> config.getTargets().size()).sum(); + return Executors.newScheduledThreadPool(targetsCount, ThingsBoardThreadFactory.forName("monitoring-executor")); + } + + @Bean + public ExecutorService requestExecutor() { + return Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("request-executor")); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/Lwm2mClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/Lwm2mClient.java new file mode 100644 index 0000000000..e608bbc28a --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/Lwm2mClient.java @@ -0,0 +1,182 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.californium.core.network.CoapEndpoint; +import org.eclipse.californium.core.network.config.NetworkConfig; +import org.eclipse.californium.core.observe.ObservationStore; +import org.eclipse.californium.scandium.DTLSConnector; +import org.eclipse.californium.scandium.config.DtlsConnectorConfig; +import org.eclipse.leshan.client.californium.LeshanClient; +import org.eclipse.leshan.client.californium.LeshanClientBuilder; +import org.eclipse.leshan.client.engine.DefaultRegistrationEngineFactory; +import org.eclipse.leshan.client.object.Security; +import org.eclipse.leshan.client.object.Server; +import org.eclipse.leshan.client.resource.BaseInstanceEnabler; +import org.eclipse.leshan.client.resource.DummyInstanceEnabler; +import org.eclipse.leshan.client.resource.ObjectsInitializer; +import org.eclipse.leshan.client.servers.ServerIdentity; +import org.eclipse.leshan.core.californium.EndpointFactory; +import org.eclipse.leshan.core.model.InvalidDDFFileException; +import org.eclipse.leshan.core.model.LwM2mModel; +import org.eclipse.leshan.core.model.ObjectLoader; +import org.eclipse.leshan.core.model.ObjectModel; +import org.eclipse.leshan.core.model.StaticModel; +import org.eclipse.leshan.core.node.codec.DefaultLwM2mDecoder; +import org.eclipse.leshan.core.node.codec.DefaultLwM2mEncoder; +import org.eclipse.leshan.core.response.ReadResponse; + +import javax.security.auth.Destroyable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.eclipse.leshan.client.object.Security.noSec; +import static org.eclipse.leshan.core.LwM2mId.ACCESS_CONTROL; +import static org.eclipse.leshan.core.LwM2mId.DEVICE; +import static org.eclipse.leshan.core.LwM2mId.SECURITY; +import static org.eclipse.leshan.core.LwM2mId.SERVER; + +@Slf4j +public class Lwm2mClient extends BaseInstanceEnabler implements Destroyable { + + @Getter + @Setter + private String name; + + @Getter + @Setter + private LeshanClient leshanClient; + + private List models; + private Security security; + private NetworkConfig coapConfig; + + private static final List supportedResources = Collections.singletonList(2); + + private String data = UUID.randomUUID().toString(); + + private String serverUri; + private String endpoint; + + public Lwm2mClient(String serverUri, String endpoint) { + this.serverUri = serverUri; + this.endpoint = endpoint; + } + + public Lwm2mClient() { + } + + public void initClient() throws InvalidDDFFileException, IOException { + String[] resources = new String[]{"0.xml", "1.xml", "2.xml", "3.xml"}; + models = new ArrayList<>(); + for (String resourceName : resources) { + models.addAll(ObjectLoader.loadDdfFile(getClass().getClassLoader().getResourceAsStream("lwm2m/" + resourceName), resourceName)); + } + + security = noSec(serverUri, 123); + coapConfig = new NetworkConfig().setString("COAP_PORT", StringUtils.substringAfterLast(serverUri, ":")); + + + setName(endpoint); + + LeshanClient leshanClient; + + LwM2mModel model = new StaticModel(models); + ObjectsInitializer initializer = new ObjectsInitializer(model); + initializer.setInstancesForObject(SECURITY, security); + initializer.setInstancesForObject(SERVER, new Server(123, 300)); + initializer.setInstancesForObject(DEVICE, this); + initializer.setClassForObject(ACCESS_CONTROL, DummyInstanceEnabler.class); + DtlsConnectorConfig.Builder dtlsConfig = new DtlsConnectorConfig.Builder(); + dtlsConfig.setRecommendedCipherSuitesOnly(true); + dtlsConfig.setClientOnly(); + + DefaultRegistrationEngineFactory engineFactory = new DefaultRegistrationEngineFactory(); + engineFactory.setReconnectOnUpdate(false); + engineFactory.setResumeOnConnect(true); + + EndpointFactory endpointFactory = new EndpointFactory() { + + @Override + public CoapEndpoint createUnsecuredEndpoint(InetSocketAddress address, NetworkConfig coapConfig, + ObservationStore store) { + CoapEndpoint.Builder builder = new CoapEndpoint.Builder(); + builder.setInetSocketAddress(address); + builder.setNetworkConfig(coapConfig); + return builder.build(); + } + + @Override + public CoapEndpoint createSecuredEndpoint(DtlsConnectorConfig dtlsConfig, NetworkConfig coapConfig, + ObservationStore store) { + CoapEndpoint.Builder builder = new CoapEndpoint.Builder(); + DtlsConnectorConfig.Builder dtlsConfigBuilder = new DtlsConnectorConfig.Builder(dtlsConfig); + builder.setConnector(new DTLSConnector(dtlsConfigBuilder.build())); + builder.setNetworkConfig(coapConfig); + return builder.build(); + } + }; + + LeshanClientBuilder builder = new LeshanClientBuilder(endpoint); + builder.setObjects(initializer.createAll()); + builder.setCoapConfig(coapConfig); + builder.setDtlsConfig(dtlsConfig); + builder.setRegistrationEngineFactory(engineFactory); + builder.setEndpointFactory(endpointFactory); + builder.setDecoder(new DefaultLwM2mDecoder(false)); + builder.setEncoder(new DefaultLwM2mEncoder(false)); + leshanClient = builder.build(); + + setLeshanClient(leshanClient); + + leshanClient.start(); + } + + @Override + public ReadResponse read(ServerIdentity identity, int resourceId) { + if (resourceId == 2) { + return ReadResponse.success(resourceId, data); + } + return ReadResponse.notFound(); + } + + @Override + public List getAvailableResourceIds(ObjectModel model) { + return supportedResources; + } + + @SneakyThrows + public void send(String data) { + this.data = data; + fireResourcesChange(2); + } + + @Override + public void destroy() { + if (leshanClient != null) { + leshanClient.destroy(true); + } + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClientFactory.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClientFactory.java new file mode 100644 index 0000000000..1a723e2db8 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClientFactory.java @@ -0,0 +1,14 @@ +package org.thingsboard.monitoring.client; + +import org.springframework.beans.factory.annotation.Lookup; +import org.springframework.stereotype.Component; + +@Component +public class TbClientFactory { + + @Lookup + public TbRestClient createClient() { + return null; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbRestClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbRestClient.java new file mode 100644 index 0000000000..17df7d7801 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbRestClient.java @@ -0,0 +1,43 @@ +package org.thingsboard.monitoring.client; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.data.TransportInfo; +import org.thingsboard.rest.client.RestClient; +import org.thingsboard.server.common.data.Device; + +import java.time.Duration; + +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class TbRestClient extends RestClient { + + @Value("${monitoring.auth.username}") + private String username; + @Value("${monitoring.auth.password}") + private String password; + + public TbRestClient(@Value("${monitoring.auth.base_url}") String baseUrl) { + super(new RestTemplateBuilder() + .setConnectTimeout(Duration.ofSeconds(5)) + .setReadTimeout(Duration.ofSeconds(2)) + .build(), baseUrl); + } + + public String logIn() { + login(username, password); + return getToken(); + } + + public Device createDeviceForMonitoringIfNotExists(TransportInfo transportInfo, String deviceName) { +// getTenantDevice(name) +// .orElseGet(() -> { +// Device device = +// }) + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java new file mode 100644 index 0000000000..d8e2cbdfec --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java @@ -0,0 +1,123 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.client; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.data.cmd.CmdsWrapper; +import org.thingsboard.monitoring.data.cmd.TimeseriesSubscriptionCmd; +import org.thingsboard.monitoring.data.cmd.TimeseriesUpdate; + +import java.net.URI; +import java.nio.channels.NotYetConnectedException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class WsClient extends WebSocketClient { + + private volatile String lastMsg; + private CountDownLatch reply; + private CountDownLatch update; + + public WsClient(URI serverUri) { + super(serverUri); + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + + } + + @Override + public void onMessage(String s) { + lastMsg = s; + if (reply != null) { + reply.countDown(); + } + if (update != null) { + update.countDown(); + } + } + + @Override + public void onClose(int i, String s, boolean b) { + log.debug("WebSocket client is closed"); + } + + @Override + public void onError(Exception e) { + log.error("WebSocket client error:", e); + } + + public void registerWaitForUpdate() { + lastMsg = null; + update = new CountDownLatch(1); + } + + @Override + public void send(String text) throws NotYetConnectedException { + reply = new CountDownLatch(1); + super.send(text); + } + + public void subscribeForTelemetry(UUID deviceId, String telemetryKey) { + TimeseriesSubscriptionCmd subCmd = new TimeseriesSubscriptionCmd(); + subCmd.setEntityType("DEVICE"); + subCmd.setEntityId(deviceId.toString()); + subCmd.setScope("LATEST_TELEMETRY"); + subCmd.setKeys(telemetryKey); + subCmd.setCmdId(RandomUtils.nextInt(0, 100)); + + CmdsWrapper wrapper = new CmdsWrapper(); + wrapper.setTsSubCmds(List.of(subCmd)); + send(JacksonUtil.toString(wrapper)); + } + + public String waitForUpdate(long ms) { + try { + if (update.await(ms, TimeUnit.MILLISECONDS)) { + return lastMsg; + } + } catch (InterruptedException e) { + log.debug("Failed to await reply", e); + } + return null; + } + + public String waitForReply(int ms) { + try { + if (reply.await(ms, TimeUnit.MILLISECONDS)) { + return lastMsg; + } + } catch (InterruptedException e) { + log.debug("Failed to await reply", e); + } + return null; + } + + public Object getTelemetryKeyUpdate(String key) { + if (lastMsg == null) return null; + TimeseriesUpdate update = JacksonUtil.fromString(lastMsg, TimeseriesUpdate.class); + return update.getLatest(key); + } + +} \ No newline at end of file diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java new file mode 100644 index 0000000000..1a16094dd9 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/DeviceConfig.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config; + +import lombok.Data; +import org.apache.commons.lang3.StringUtils; + +import java.util.UUID; + +@Data +public class DeviceConfig { + private UUID id; + private String accessToken; + + public void setId(String id) { + this.id = StringUtils.isNotEmpty(id) ? UUID.fromString(id) : null; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java new file mode 100644 index 0000000000..afbe62bb05 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/MonitoringTargetConfig.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config; + +import lombok.Data; + +@Data +public class MonitoringTargetConfig { + + private String baseUrl; + private DeviceConfig device; + + @Override + public String toString() { + return "Monitoring target [base url: '" + baseUrl + "', device: " + device.getId() + "]"; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java new file mode 100644 index 0000000000..fc93c6d71e --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/TransportType.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.thingsboard.monitoring.service.TransportMonitoringService; +import org.thingsboard.monitoring.service.impl.CoapTransportMonitoringService; +import org.thingsboard.monitoring.service.impl.HttpTransportMonitoringService; +import org.thingsboard.monitoring.service.impl.Lwm2mTransportMonitoringService; +import org.thingsboard.monitoring.service.impl.MqttTransportMonitoringService; + +@AllArgsConstructor +@Getter +public enum TransportType { + MQTT(MqttTransportMonitoringService.class), + COAP(CoapTransportMonitoringService.class), + LWM2M(Lwm2mTransportMonitoringService.class), + HTTP(HttpTransportMonitoringService.class); + + private final Class> monitoringServiceClass; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java new file mode 100644 index 0000000000..d4efe64091 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/WsConfig.java @@ -0,0 +1,18 @@ +package org.thingsboard.monitoring.config; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +@Getter +public class WsConfig { + + @Value("${monitoring.ws.base_url}") + private String baseUrl; + @Value("${monitoring.ws.request_timeout_ms}") + private int requestTimeoutMs; + @Value("${monitoring.ws.check_timeout_ms}") + private int resultCheckTimeoutMs; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java new file mode 100644 index 0000000000..ad58430892 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/CoapTransportMonitoringServiceConfig.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.rest.client.RestClient; + +import javax.annotation.PostConstruct; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.coap.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.coap") +public class CoapTransportMonitoringServiceConfig extends TransportMonitoringServiceConfig { + + @Override + public TransportType getTransportType() { + return TransportType.COAP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java new file mode 100644 index 0000000000..49ec69191b --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/HttpTransportMonitoringServiceConfig.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.http.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.http") +public class HttpTransportMonitoringServiceConfig extends TransportMonitoringServiceConfig { + @Override + public TransportType getTransportType() { + return TransportType.HTTP; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/Lwm2mTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/Lwm2mTransportMonitoringServiceConfig.java new file mode 100644 index 0000000000..689223c2bf --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/Lwm2mTransportMonitoringServiceConfig.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.lwm2m.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.lwm2m") +public class Lwm2mTransportMonitoringServiceConfig extends TransportMonitoringServiceConfig { + @Override + public TransportType getTransportType() { + return TransportType.LWM2M; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java new file mode 100644 index 0000000000..348be0146d --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/MqttTransportMonitoringServiceConfig.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import lombok.Data; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; +import org.thingsboard.monitoring.config.TransportType; + +@Component +@ConditionalOnProperty(name = "monitoring.transports.mqtt.enabled", havingValue = "true") +@ConfigurationProperties(prefix = "monitoring.transports.mqtt") +@Data +public class MqttTransportMonitoringServiceConfig extends TransportMonitoringServiceConfig { + private Integer qos; + + @Override + public TransportType getTransportType() { + return TransportType.MQTT; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java new file mode 100644 index 0000000000..7cdbb78ad1 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/service/TransportMonitoringServiceConfig.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.config.service; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; + +import java.util.List; + +@Data +public abstract class TransportMonitoringServiceConfig { + private int monitoringRateMs; + private int requestTimeoutMs; + private int initialDelayMs; + @Value("${monitoring.failure_threshold}") + private int failureThreshold; + + private List targets; + + public abstract TransportType getTransportType(); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoringStats.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoringStats.java new file mode 100644 index 0000000000..e3e94c05dc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/MonitoringStats.java @@ -0,0 +1,4 @@ +package org.thingsboard.monitoring.data; + +public class MonitoringStats { +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java new file mode 100644 index 0000000000..099fd1cf7a --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/TransportInfo.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data; + +import lombok.Data; +import org.thingsboard.monitoring.config.TransportType; + +@Data +public class TransportInfo { + private final TransportType transportType; + private final String url; +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java new file mode 100644 index 0000000000..cc0c482d3e --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/CmdsWrapper.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.cmd; + +import lombok.Data; + +import java.util.List; + +@Data +public class CmdsWrapper { + + private List tsSubCmds; + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java new file mode 100644 index 0000000000..d527e8876a --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesSubscriptionCmd.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.cmd; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class TimeseriesSubscriptionCmd { + + private int cmdId; + private String entityType; + private String entityId; + private String keys; + private String scope; + + public String getType() { + return "TIMESERIES"; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java new file mode 100644 index 0000000000..7dc24115eb --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/TimeseriesUpdate.java @@ -0,0 +1,37 @@ +package org.thingsboard.monitoring.data.cmd; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; +import org.springframework.data.util.Pair; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class TimeseriesUpdate { + + private final int cmdId; + private final int errorCode; + private final String errorMsg; + private Map>> data; + + public Object getLatest(String key) { + if (!data.containsKey(key)) return null; + return data.get(key).stream() + .map(tsAndValue -> { + if (tsAndValue == null || tsAndValue.size() != 2) { + return null; + } + long ts = Long.parseLong(tsAndValue.get(0).toString()); + Object value = tsAndValue.get(1); + return Pair.of(ts, value); + }) + .filter(Objects::nonNull) + .max(Comparator.comparing(Pair::getFirst)) + .map(Pair::getSecond).orElse(null); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringFailureNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringFailureNotificationInfo.java new file mode 100644 index 0000000000..2c252266cc --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringFailureNotificationInfo.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import lombok.Getter; +import org.thingsboard.monitoring.data.TransportInfo; + +@Getter +public class MonitoringFailureNotificationInfo extends NotificationInfo { + private final Exception error; + + public MonitoringFailureNotificationInfo(TransportInfo transportInfo, Exception error) { + super(transportInfo); + this.error = error; + } + + @Override + public NotificationType getType() { + return NotificationType.MONITORING_FAILURE; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringRecoveryNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringRecoveryNotificationInfo.java new file mode 100644 index 0000000000..16bf577f71 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/MonitoringRecoveryNotificationInfo.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import org.thingsboard.monitoring.data.TransportInfo; + +public class MonitoringRecoveryNotificationInfo extends NotificationInfo { + public MonitoringRecoveryNotificationInfo(TransportInfo transportInfo) { + super(transportInfo); + } + + @Override + public NotificationType getType() { + return NotificationType.MONITORING_RECOVERY; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationInfo.java new file mode 100644 index 0000000000..99d47f1345 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationInfo.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import lombok.Data; +import org.thingsboard.monitoring.data.TransportInfo; + +@Data +public abstract class NotificationInfo { + private final TransportInfo transportInfo; + + public abstract NotificationType getType(); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationType.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationType.java new file mode 100644 index 0000000000..4a5bfffdd1 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/NotificationType.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +public enum NotificationType { + TRANSPORT_FAILURE, + MONITORING_FAILURE, + TRANSPORT_RECOVERY, + MONITORING_RECOVERY; +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportFailureNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportFailureNotificationInfo.java new file mode 100644 index 0000000000..29e68b27de --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportFailureNotificationInfo.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import lombok.Getter; +import org.thingsboard.monitoring.data.TransportInfo; + +@Getter +public class TransportFailureNotificationInfo extends NotificationInfo { + private final Exception error; + + public TransportFailureNotificationInfo(TransportInfo transportInfo, Exception error) { + super(transportInfo); + this.error = error; + } + + @Override + public NotificationType getType() { + return NotificationType.TRANSPORT_FAILURE; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportRecoveryNotificationInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportRecoveryNotificationInfo.java new file mode 100644 index 0000000000..1c35944d02 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/TransportRecoveryNotificationInfo.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.data.notification; + +import org.thingsboard.monitoring.data.TransportInfo; + +public class TransportRecoveryNotificationInfo extends NotificationInfo { + public TransportRecoveryNotificationInfo(TransportInfo transportInfo) { + super(transportInfo); + } + + @Override + public NotificationType getType() { + return NotificationType.TRANSPORT_RECOVERY; + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java new file mode 100644 index 0000000000..18bada188f --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.notification; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.monitoring.notification.channels.NotificationChannel; +import org.thingsboard.monitoring.data.notification.NotificationInfo; + +import java.util.List; + +@Service +@RequiredArgsConstructor +@Slf4j +public class NotificationService { + private final List notificationChannels; + + public void notify(NotificationInfo notificationInfo) { + notificationChannels.forEach(notificationChannel -> { + try { + notificationChannel.sendNotification(notificationInfo); + } catch (Exception e) { + log.error("Failed to send notification to {} ({})", notificationChannel.getClass().getSimpleName(), notificationInfo, e); + } + }); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java new file mode 100644 index 0000000000..259ffdf9b8 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.notification.channels; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.thingsboard.monitoring.data.notification.MonitoringFailureNotificationInfo; +import org.thingsboard.monitoring.data.notification.NotificationInfo; +import org.thingsboard.monitoring.data.notification.TransportFailureNotificationInfo; + +@Slf4j +public abstract class NotificationChannel { + + public abstract void sendNotification(NotificationInfo notificationInfo); + + + protected String createNotificationMessage(NotificationInfo notificationInfo) { + String message = String.format("[%s transport (%s)]", notificationInfo.getTransportInfo().getTransportType(), notificationInfo.getTransportInfo().getUrl()); + + switch (notificationInfo.getType()) { + case TRANSPORT_FAILURE: + TransportFailureNotificationInfo transportFailureNotificationInfo = (TransportFailureNotificationInfo) notificationInfo; + message += " Transport failure: " + getErrorMessage(transportFailureNotificationInfo.getError()); + break; + case MONITORING_FAILURE: + MonitoringFailureNotificationInfo monitoringFailureNotificationInfo = (MonitoringFailureNotificationInfo) notificationInfo; + message += " Monitoring failure: " + getErrorMessage(monitoringFailureNotificationInfo.getError()); + break; + case TRANSPORT_RECOVERY: + message += " Transport is now working"; + break; + case MONITORING_RECOVERY: + message += " Monitoring is now working"; + break; + default: + throw new UnsupportedOperationException("Notification type " + notificationInfo.getType() + " not supported"); + } + + return message; + } + + protected String getErrorMessage(Exception error) { + return ExceptionUtils.getMessage(error); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java new file mode 100644 index 0000000000..39f8cbf8d5 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java @@ -0,0 +1,54 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.notification.channels.impl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.monitoring.notification.channels.NotificationChannel; +import org.thingsboard.monitoring.data.notification.NotificationInfo; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.Map; + +@Component +@ConditionalOnProperty(value = "monitoring.notification_channels.slack.enabled", havingValue = "true") +@Slf4j +public class SlackNotificationChannel extends NotificationChannel { + @Value("${monitoring.notification_channels.slack.webhook_url}") + private String webhookUrl; + + private RestTemplate restTemplate; + + @PostConstruct + private void init() { + restTemplate = new RestTemplateBuilder() + .setConnectTimeout(Duration.ofSeconds(5)) + .setReadTimeout(Duration.ofSeconds(2)) + .build(); + } + + @Override + public void sendNotification(NotificationInfo notificationInfo) { + String message = createNotificationMessage(notificationInfo); + restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java new file mode 100644 index 0000000000..2a7f7ffe70 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -0,0 +1,32 @@ +package org.thingsboard.monitoring.service; + +import org.springframework.stereotype.Service; +import org.thingsboard.monitoring.data.TransportInfo; + +@Service +public class MonitoringReporter { + + public void reportTransportRequestLatency(TransportInfo transportInfo, long latencyInNanos) { + double latencyInMs = (double) latencyInNanos / 1000_000; + } + + public void reportTransportConnectLatency(TransportInfo transportInfo, long latencyInNanos) { + + } + + public void reportWsUpdateLatency(long latencyInNanos) { + + } + + public void reportWsConnectLatency(long latencyInNanos) { + + } + + public void reportLogInLatency(long latencyInNanos) { + + } + + public void reportFailure(TransportInfo transportInfo, Exception error) { + + } +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java new file mode 100644 index 0000000000..22e24cf4e1 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/TransportMonitoringService.java @@ -0,0 +1,199 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service; + +import com.fasterxml.jackson.databind.node.TextNode; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.StopWatch; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.monitoring.client.TbClientFactory; +import org.thingsboard.monitoring.client.WsClient; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.WsConfig; +import org.thingsboard.monitoring.config.service.TransportMonitoringServiceConfig; +import org.thingsboard.monitoring.data.TransportInfo; +import org.thingsboard.monitoring.data.notification.MonitoringFailureNotificationInfo; +import org.thingsboard.monitoring.data.notification.MonitoringRecoveryNotificationInfo; +import org.thingsboard.monitoring.data.notification.TransportFailureNotificationInfo; +import org.thingsboard.monitoring.data.notification.TransportRecoveryNotificationInfo; +import org.thingsboard.monitoring.notification.NotificationService; + +import javax.annotation.PostConstruct; +import java.net.URI; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +public abstract class TransportMonitoringService { + + protected final C config; + protected final MonitoringTargetConfig target; + private TransportInfo transportInfo; + @Autowired + private WsConfig wsConfig; + + @Autowired + private NotificationService notificationService; + @Autowired + private MonitoringReporter monitoringReporter; + @Autowired + @Qualifier("monitoringExecutor") + private ScheduledExecutorService monitoringExecutor; + @Autowired + @Qualifier("requestExecutor") + private ExecutorService requestExecutor; + @Autowired + private TbClientFactory tbClientFactory; + + private final AtomicInteger transportFailuresCounter = new AtomicInteger(); + private final AtomicInteger monitoringFailuresCounter = new AtomicInteger(); + + private final StopWatch stopWatch = new StopWatch(); + + protected static final String TEST_TELEMETRY_KEY = "testData"; + + protected TransportMonitoringService(C config, MonitoringTargetConfig target) { + this.config = config; + this.target = target; + } + + @PostConstruct + public void startMonitoring() { + transportInfo = new TransportInfo(getTransportType(), target.getBaseUrl()); + // todo: create devices + monitoringExecutor.scheduleWithFixedDelay(() -> { + try { + startStopWatch(); + initClient(); + monitoringReporter.reportTransportConnectLatency(transportInfo, getElapsedTime()); + + WsClient wsClient = establishWsClient(); + wsClient.registerWaitForUpdate(); + String testPayload = createTestPayload(UUID.randomUUID().toString()); + + startStopWatch(); + Future resultFuture = requestExecutor.submit(() -> { + try { + sendTestPayload(testPayload); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + resultFuture.get(config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); + monitoringReporter.reportTransportRequestLatency(transportInfo, getElapsedTime()); + + startStopWatch(); + wsClient.waitForUpdate(wsConfig.getResultCheckTimeoutMs()); + Object update = wsClient.getTelemetryKeyUpdate(TEST_TELEMETRY_KEY); + boolean success = update != null && update.toString().equals(testPayload); + if (!success) { + throw new RuntimeException("No WS update arrived"); + } + monitoringReporter.reportWsUpdateLatency(getElapsedTime()); + wsClient.closeBlocking(); + destroyClient(); + } catch (Exception e) { + monitoringReporter.reportFailure(transportInfo, e); + } + }, config.getInitialDelayMs(), config.getMonitoringRateMs(), TimeUnit.MILLISECONDS); + log.info("Started monitoring for transport type {} for target {}", getTransportType(), target); + } + + private WsClient establishWsClient() throws Exception { + startStopWatch(); + String accessToken = tbClientFactory.createClient().logIn(); + monitoringReporter.reportLogInLatency(getElapsedTime()); + + URI uri = new URI(wsConfig.getBaseUrl() + "/api/ws/plugins/telemetry?token=" + accessToken); + startStopWatch(); + WsClient wsClient = new WsClient(uri); + boolean connected = wsClient.connectBlocking(wsConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS); + if (!connected) { + throw new IllegalStateException("Failed to establish WS session"); + } + wsClient.subscribeForTelemetry(target.getDevice().getId(), TEST_TELEMETRY_KEY); + Optional.ofNullable(wsClient.waitForReply(wsConfig.getRequestTimeoutMs())) + .orElseThrow(() -> new IllegalStateException("Failed to subscribe for telemetry")); + monitoringReporter.reportWsConnectLatency(getElapsedTime()); + return wsClient; + } + + protected abstract void initClient() throws Exception; + + protected String createTestPayload(String testValue) { + return JacksonUtil.newObjectNode().set(TEST_TELEMETRY_KEY, new TextNode(testValue)).toString(); + } + + protected abstract void sendTestPayload(String payload) throws Exception; + + protected abstract void destroyClient() throws Exception; + + + private void startStopWatch() { + stopWatch.start(); + } + + private long getElapsedTime() { + stopWatch.stop(); + long nanoTime = stopWatch.getNanoTime(); + stopWatch.reset(); + return nanoTime; + } + + private void onTransportFailure(Exception e) { + log.debug("[{}] Transport failure", transportInfo, e); + + int failuresCount = transportFailuresCounter.incrementAndGet(); + if (failuresCount == config.getFailureThreshold()) { + notificationService.notify(new TransportFailureNotificationInfo(transportInfo, e)); + } + } + + private void onMonitoringFailure(Exception e) { + log.debug("[{}] Monitoring failure", transportInfo, e); + + int failuresCount = monitoringFailuresCounter.incrementAndGet(); + if (failuresCount == config.getFailureThreshold()) { + notificationService.notify(new MonitoringFailureNotificationInfo(transportInfo, e)); + } + } + + private void onTransportIsOk() { + log.debug("[{}] Transport is OK", transportInfo); + + if (transportFailuresCounter.get() >= config.getFailureThreshold()) { + notificationService.notify(new TransportRecoveryNotificationInfo(transportInfo)); + } + if (monitoringFailuresCounter.get() >= config.getFailureThreshold()) { + notificationService.notify(new MonitoringRecoveryNotificationInfo(transportInfo)); + } + + transportFailuresCounter.set(0); + monitoringFailuresCounter.set(0); + } + + + protected abstract TransportType getTransportType(); + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java new file mode 100644 index 0000000000..912d3d78c7 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/CoapTransportMonitoringService.java @@ -0,0 +1,69 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.impl; + +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.service.CoapTransportMonitoringServiceConfig; +import org.thingsboard.monitoring.service.TransportMonitoringService; + +import java.io.IOException; + +@Service +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class CoapTransportMonitoringService extends TransportMonitoringService { + + private CoapClient coapClient; + + protected CoapTransportMonitoringService(CoapTransportMonitoringServiceConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + String uri = target.getBaseUrl() + "/api/v1/" + target.getDevice().getAccessToken() + "/telemetry"; + coapClient = new CoapClient(uri); + coapClient.setTimeout((long) config.getRequestTimeoutMs()); + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + CoapResponse response = coapClient.post(payload, MediaTypeRegistry.APPLICATION_JSON); + CoAP.ResponseCode code = response.getCode(); + if (code.codeClass != CoAP.CodeClass.SUCCESS_RESPONSE.value) { + throw new IOException("COAP client didn't receive success response from transport"); + } + } + + @Override + protected void destroyClient() throws Exception { + coapClient.shutdown(); + coapClient = null; + } + + @Override + protected TransportType getTransportType() { + return TransportType.COAP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/HttpTransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/HttpTransportMonitoringService.java new file mode 100644 index 0000000000..7512a657ab --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/HttpTransportMonitoringService.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.impl; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.service.HttpTransportMonitoringServiceConfig; +import org.thingsboard.monitoring.service.TransportMonitoringService; + +import javax.annotation.PostConstruct; +import java.time.Duration; + +@Service +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class HttpTransportMonitoringService extends TransportMonitoringService { + + private RestTemplate restTemplate; + + protected HttpTransportMonitoringService(HttpTransportMonitoringServiceConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + if (restTemplate == null) { + restTemplate = new RestTemplateBuilder() + .setConnectTimeout(Duration.ofMillis(config.getRequestTimeoutMs())) + .setReadTimeout(Duration.ofMillis(config.getRequestTimeoutMs())) + .build(); + } + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + restTemplate.postForObject(target.getBaseUrl() + "/api/v1/" + target.getDevice().getAccessToken() + "/telemetry", payload, String.class); + } + + @Override + protected void destroyClient() throws Exception {} + + @Override + protected TransportType getTransportType() { + return TransportType.HTTP; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/Lwm2mTransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/Lwm2mTransportMonitoringService.java new file mode 100644 index 0000000000..4fb34969a5 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/Lwm2mTransportMonitoringService.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.impl; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.service.Lwm2mTransportMonitoringServiceConfig; +import org.thingsboard.monitoring.service.TransportMonitoringService; +import org.thingsboard.monitoring.client.Lwm2mClient; + +@Service +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class Lwm2mTransportMonitoringService extends TransportMonitoringService { + + private Lwm2mClient lwm2mClient; + + protected Lwm2mTransportMonitoringService(Lwm2mTransportMonitoringServiceConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + lwm2mClient = new Lwm2mClient(target.getBaseUrl(), target.getDevice().getAccessToken()); + lwm2mClient.initClient(); + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + lwm2mClient.send(payload); + } + + @Override + protected String createTestPayload(String testValue) { + return testValue; + } + + @Override + protected void destroyClient() throws Exception { + lwm2mClient.destroy(); + lwm2mClient = null; + } + + @Override + protected TransportType getTransportType() { + return TransportType.LWM2M; + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java new file mode 100644 index 0000000000..52674a77ca --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/impl/MqttTransportMonitoringService.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service.impl; + +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; +import org.thingsboard.monitoring.config.TransportType; +import org.thingsboard.monitoring.config.MonitoringTargetConfig; +import org.thingsboard.monitoring.config.service.MqttTransportMonitoringServiceConfig; +import org.thingsboard.monitoring.service.TransportMonitoringService; + +@Service +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class MqttTransportMonitoringService extends TransportMonitoringService { + + private MqttAsyncClient mqttClient; + + private static final String DEVICE_TELEMETRY_TOPIC = "v1/devices/me/telemetry"; + + protected MqttTransportMonitoringService(MqttTransportMonitoringServiceConfig config, MonitoringTargetConfig target) { + super(config, target); + } + + @Override + protected void initClient() throws Exception { + String clientId = MqttAsyncClient.generateClientId(); + mqttClient = new MqttAsyncClient(target.getBaseUrl(), clientId, new MemoryPersistence()); + + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(target.getDevice().getAccessToken()); + mqttClient.connect(options).waitForCompletion(config.getRequestTimeoutMs()); + } + + @Override + protected void sendTestPayload(String payload) throws Exception { + MqttMessage message = new MqttMessage(); + message.setPayload(payload.getBytes()); + message.setQos(config.getQos()); + mqttClient.publish(DEVICE_TELEMETRY_TOPIC, message).waitForCompletion(config.getRequestTimeoutMs()); + } + + @Override + protected void destroyClient() throws Exception { + mqttClient.disconnect(); + mqttClient.close(); + mqttClient = null; + } + + @Override + protected TransportType getTransportType() { + return TransportType.MQTT; + } + +} diff --git a/monitoring/src/main/resources/logback.xml b/monitoring/src/main/resources/logback.xml new file mode 100644 index 0000000000..88873263f6 --- /dev/null +++ b/monitoring/src/main/resources/logback.xml @@ -0,0 +1,37 @@ + + + + + + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/monitoring/src/main/resources/lwm2m/0.xml b/monitoring/src/main/resources/lwm2m/0.xml new file mode 100644 index 0000000000..b49381fb28 --- /dev/null +++ b/monitoring/src/main/resources/lwm2m/0.xml @@ -0,0 +1,364 @@ + + + + + LWM2M Security + + 0 + urn:oma:lwm2m:oma:0:1.2 + 1.1 + 1.2 + Multiple + Mandatory + + + LWM2M Server URI + + Single + Mandatory + String + 0..255 + + + + + Bootstrap-Server + + Single + Mandatory + Boolean + + + + + + Security Mode + + Single + Mandatory + Integer + 0..4 + + + + + Public Key or Identity + + Single + Mandatory + Opaque + + + + + + Server Public Key + + Single + Mandatory + Opaque + + + + + + Secret Key + + Single + Mandatory + Opaque + + + + + + SMS Security Mode + + Single + Optional + Integer + 0..255 + + + + + SMS Binding Key Parameters + + Single + Optional + Opaque + 6 + + + + + SMS Binding Secret Key(s) + + Single + Optional + Opaque + 16,32,48 + + + + + LwM2M Server SMS Number + + Single + Optional + String + + + + + + Short Server ID + + Single + Optional + Integer + 1..65534 + + + + + Client Hold Off Time + + Single + Optional + Integer + + s + + + + Bootstrap-Server Account Timeout + + Single + Optional + Integer + + s + + + + Matching Type + + Single + Optional + Integer + 0..3 + + + + + SNI + + Single + Optional + String + + + + + + Certificate Usage + + Single + Optional + Integer + 0..3 + + + + + DTLS/TLS Ciphersuite + + Multiple + Optional + Integer + + + + + OSCORE Security Mode + + Single + Optional + Objlnk + + + + + + Groups To Use by Client + + Multiple + Optional + Integer + 0..65535 + + + + + Signature Algorithms Supported by Server + + Multiple + Optional + Integer + 0..65535 + + + + Signature Algorithms To Use by Client + + Multiple + Optional + Integer + 0..65535 + + + + + Signature Algorithm Certs Supported by Server + + Multiple + Optional + Integer + 0..65535 + + + + + TLS 1.3 Features To Use by Client + + Single + Optional + Integer + 0..65535 + + + + + TLS Extensions Supported by Server + + Single + Optional + Integer + 0..65535 + + + + + TLS Extensions To Use by Client + + Single + Optional + Integer + 0..65535 + + + + + Secondary LwM2M Server URI + + Multiple + Optional + String + 0..255 + + + + MQTT Server + + Single + Optional + Objlnk + + + + + LwM2M COSE Security + + Multiple + Optional + Objlnk + + + + + RDS Destination Port + + Single + Optional + Integer + 0..15 + + + + RDS Source Port + + Single + Optional + Integer + 0..15 + + + + RDS Application ID + + Single + Optional + String + + + + + + + + diff --git a/monitoring/src/main/resources/lwm2m/1.xml b/monitoring/src/main/resources/lwm2m/1.xml new file mode 100644 index 0000000000..45946b118b --- /dev/null +++ b/monitoring/src/main/resources/lwm2m/1.xml @@ -0,0 +1,319 @@ + + + + + LwM2M Server + + 1 + urn:oma:lwm2m:oma:1:1.2 + 1.2 + 1.2 + Multiple + Mandatory + + + Short Server ID + R + Single + Mandatory + Integer + 1..65534 + + + + + Lifetime + RW + Single + Mandatory + Integer + + s + + + + Default Minimum Period + RW + Single + Optional + Integer + + s + + + + Default Maximum Period + RW + Single + Optional + Integer + + s + + + + Disable + E + Single + Optional + + + + + + + Disable Timeout + RW + Single + Optional + Integer + + s + + + + Notification Storing When Disabled or Offline + RW + Single + Mandatory + Boolean + + + + + + Binding + RW + Single + Mandatory + String + + + + + + Registration Update Trigger + E + Single + Mandatory + + + + + + + Bootstrap-Request Trigger + E + Single + Optional + + + + + + + APN Link + RW + Single + Optional + Objlnk + + + + + + TLS-DTLS Alert Code + R + Single + Optional + Integer + 0..255 + + + + + Last Bootstrapped + R + Single + Optional + Time + + + + + + Registration Priority Order + R + Single + Optional + Integer + + + + + + Initial Registration Delay Timer + RW + Single + Optional + Integer + + s + + + + Registration Failure Block + R + Single + Optional + Boolean + + + + + + Bootstrap on Registration Failure + R + Single + Optional + Boolean + + + + + + Communication Retry Count + RW + Single + Optional + Integer + + + + + + Communication Retry Timer + RW + Single + Optional + Integer + + s + + + + Communication Sequence Delay Timer + RW + Single + Optional + Integer + + s + + + + Communication Sequence Retry Count + RW + Single + Optional + Integer + + + + + + Trigger + RW + Single + Optional + Boolean + + + + + + Preferred Transport + RW + Single + Optional + String + The possible values are those listed in the LwM2M Core Specification + + + + Mute Send + RW + Single + Optional + Boolean + + + + + + Alternate APN Links + RW + Multiple + Optional + Objlnk + + + + + + Supported Server Versions + RW + Multiple + Optional + String + + + + + + Default Notification Mode + RW + Single + Optional + Integer + 0..1 + + + + + Profile ID Hash Algorithm + RW + Single + Optional + Integer + 0..255 + + + + + + + diff --git a/monitoring/src/main/resources/lwm2m/2.xml b/monitoring/src/main/resources/lwm2m/2.xml new file mode 100644 index 0000000000..fc4d0c6d1c --- /dev/null +++ b/monitoring/src/main/resources/lwm2m/2.xml @@ -0,0 +1,83 @@ + + + + + LwM2M Access Control + + 2 + urn:oma:lwm2m:oma:2:1.1 + 1.0 + 1.1 + Multiple + Optional + + + Object ID + R + Single + Mandatory + Integer + 1..65534 + + + + + Object Instance ID + R + Single + Mandatory + Integer + 0..65535 + + + + + ACL + RW + Multiple + Optional + Integer + 0..31 + + + + + Access Control Owner + RW + Single + Mandatory + Integer + 0..65535 + + + + + + + diff --git a/monitoring/src/main/resources/lwm2m/3.xml b/monitoring/src/main/resources/lwm2m/3.xml new file mode 100644 index 0000000000..bab017e23b --- /dev/null +++ b/monitoring/src/main/resources/lwm2m/3.xml @@ -0,0 +1,290 @@ + + + + + Device + + 3 + urn:oma:lwm2m:oma:3:1.0 + 1.1 + 1.0 + Single + Mandatory + + + Manufacturer + R + Single + Optional + String + + + + + + Model Number + R + Single + Optional + String + + + + + + Serial Number + R + Single + Optional + String + + + + + + Firmware Version + R + Single + Optional + String + + + + + + Reboot + E + Single + Mandatory + + + + + + + Factory Reset + E + Single + Optional + + + + + + + Available Power Sources + R + Multiple + Optional + Integer + 0..7 + + + + + Power Source Voltage + R + Multiple + Optional + Integer + + + + + + Power Source Current + R + Multiple + Optional + Integer + + + + + + Battery Level + R + Single + Optional + Integer + 0..100 + /100 + + + + Memory Free + R + Single + Optional + Integer + + + + + + Error Code + R + Multiple + Mandatory + Integer + 0..32 + + + + + Reset Error Code + E + Single + Optional + + + + + + + Current Time + RW + Single + Optional + Time + + + + + + UTC Offset + RW + Single + Optional + String + + + + + + Timezone + RW + Single + Optional + String + + + + + + Supported Binding and Modes + R + Single + Mandatory + String + + + + + Device Type + R + Single + Optional + String + + + + + Hardware Version + R + Single + Optional + String + + + + + Software Version + R + Single + Optional + String + + + + + Battery Status + R + Single + Optional + Integer + 0..6 + + + + Memory Total + R + Single + Optional + Integer + + + + + ExtDevInfo + R + Multiple + Optional + Objlnk + + + + + + + diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml new file mode 100644 index 0000000000..3ad0e8935a --- /dev/null +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -0,0 +1,78 @@ +# +# Copyright © 2016-2022 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +monitoring: + auth: + base_url: '${AUTH_BASE_URL:http://localhost:8080}' + username: '${AUTH_USERNAME:tenant@thingsboard.org}' + password: '${AUTH_PASSWORD:tenant}' + ws: + base_url: '${WS_BASE_URL:ws://localhost:8080}' + check_timeout_ms: '${WS_CHECK_TIMEOUT_MS:3000}' + request_timeout_ms: '${WS_REQUEST_TIMEOUT_MS:5000}' + + failure_threshold: '${MONITORING_FAILURE_THRESHOLD:1}' + + transports: + mqtt: + enabled: '${MQTT_TRANSPORT_MONITORING_ENABLED:true}' + monitoring_rate_ms: '${MQTT_TRANSPORT_MONITORING_RATE_MS:10000}' + request_timeout_ms: '${MQTT_REQUEST_TIMEOUT_MS:4000}' + initial_delay_ms: '${MQTT_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' + qos: '${MQTT_QOS_LEVEL:1}' + targets: + - base_url: "tcp://localhost" + device: + id: + access_token: + + coap: + enabled: '${COAP_TRANSPORT_MONITORING_ENABLED:false}' + monitoring_rate_ms: '${COAP_TRANSPORT_MONITORING_RATE_MS:10000}' + request_timeout_ms: '${COAP_REQUEST_TIMEOUT_MS:4000}' + initial_delay_ms: '${COAP_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' + targets: + - base_url: + device: + id: + access_token: + + lwm2m: + enabled: '${LWM2M_TRANSPORT_MONITORING_ENABLED:false}' + monitoring_rate_ms: '${LWM2M_TRANSPORT_MONITORING_RATE_MS:10000}' + request_timeout_ms: '${LWM2M_REQUEST_TIMEOUT_MS:4000}' + initial_delay_ms: '${LWM2M_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' + targets: + - base_url: + device: + id: + access_token: + + http: + enabled: '${HTTP_TRANSPORT_MONITORING_ENABLED:false}' + monitoring_rate_ms: '${HTTP_TRANSPORT_MONITORING_RATE_MS:10000}' + request_timeout_ms: '${HTTP_REQUEST_TIMEOUT_MS:4000}' + initial_delay_ms: '${HTTP_TRANSPORT_MONITORING_INITIAL_DELAY_MS:0}' + targets: + - base_url: + device: + id: + access_token: + + notification_channels: + slack: + enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}' + webhook_url: '${SLACK_WEBHOOK_URL:}' diff --git a/monitoring/src/test/java/org/thingsboard/monitoring/MonitoringServiceApplicationTests.java b/monitoring/src/test/java/org/thingsboard/monitoring/MonitoringServiceApplicationTests.java new file mode 100644 index 0000000000..02f30518f6 --- /dev/null +++ b/monitoring/src/test/java/org/thingsboard/monitoring/MonitoringServiceApplicationTests.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class MonitoringServiceApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/pom.xml b/pom.xml index 60e8783bca..c931c9b1be 100755 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,7 @@ application msa rest-client + monitoring