Merge pull request #13366 from thingsboard/cf-monitoring

Calculated fields monitoring
This commit is contained in:
Viacheslav Klimov 2025-05-12 15:22:20 +03:00 committed by GitHub
commit 55db24f7e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 612 additions and 410 deletions

View File

@ -25,6 +25,7 @@ import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.monitoring.service.BaseMonitoringService; import org.thingsboard.monitoring.service.BaseMonitoringService;
import org.thingsboard.monitoring.service.MonitoringEntityService;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -38,6 +39,8 @@ public class ThingsboardMonitoringApplication {
@Autowired @Autowired
private List<BaseMonitoringService<?, ?>> monitoringServices; private List<BaseMonitoringService<?, ?>> monitoringServices;
@Autowired
private MonitoringEntityService entityService;
@Value("${monitoring.monitoring_rate_ms}") @Value("${monitoring.monitoring_rate_ms}")
private int monitoringRateMs; private int monitoringRateMs;
@ -50,6 +53,9 @@ public class ThingsboardMonitoringApplication {
@EventListener(ApplicationReadyEvent.class) @EventListener(ApplicationReadyEvent.class)
public void startMonitoring() { public void startMonitoring() {
entityService.checkEntities();
monitoringServices.forEach(BaseMonitoringService::init);
ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("monitoring-executor"); ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("monitoring-executor");
scheduler.scheduleWithFixedDelay(() -> { scheduler.scheduleWithFixedDelay(() -> {
monitoringServices.forEach(monitoringService -> { monitoringServices.forEach(monitoringService -> {

View File

@ -15,17 +15,15 @@
*/ */
package org.thingsboard.monitoring.client; package org.thingsboard.monitoring.client;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.rest.client.RestClient; import org.thingsboard.rest.client.RestClient;
import java.time.Duration; import java.time.Duration;
@Component @Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class TbClient extends RestClient { public class TbClient extends RestClient {
@Value("${monitoring.rest.username}") @Value("${monitoring.rest.username}")
@ -41,6 +39,11 @@ public class TbClient extends RestClient {
.build(), baseUrl); .build(), baseUrl);
} }
@PostConstruct
private void init() {
logIn();
}
public String logIn() { public String logIn() {
login(username, password); login(username, password);
return getToken(); return getToken();

View File

@ -36,8 +36,11 @@ import org.thingsboard.server.common.data.query.EntityListFilter;
import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLParameters;
import java.net.URI; import java.net.URI;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -48,13 +51,13 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class WsClient extends WebSocketClient implements AutoCloseable { public class WsClient extends WebSocketClient implements AutoCloseable {
public volatile JsonNode lastMsg; public final List<JsonNode> lastMsgs = new ArrayList<>();
private CountDownLatch reply; private CountDownLatch reply;
private CountDownLatch update; private CountDownLatch update;
private final Lock updateLock = new ReentrantLock(); private final Lock updateLock = new ReentrantLock();
private long requestTimeoutMs; private final long requestTimeoutMs;
public WsClient(URI serverUri, long requestTimeoutMs) { public WsClient(URI serverUri, long requestTimeoutMs) {
super(serverUri); super(serverUri);
@ -63,7 +66,6 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
@Override @Override
public void onOpen(ServerHandshake serverHandshake) { public void onOpen(ServerHandshake serverHandshake) {
} }
@Override @Override
@ -73,8 +75,9 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
} }
updateLock.lock(); updateLock.lock();
try { try {
lastMsg = JacksonUtil.toJsonNode(s); JsonNode msg = JacksonUtil.toJsonNode(s);
log.trace("Received new msg: {}", lastMsg.toPrettyString()); lastMsgs.add(msg);
log.trace("Received new msg: {}", msg.toPrettyString());
if (update != null) { if (update != null) {
update.countDown(); update.countDown();
} }
@ -96,11 +99,11 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
log.error("WebSocket client error:", e); log.error("WebSocket client error:", e);
} }
public void registerWaitForUpdate() { public void registerWaitForUpdates(int count) {
updateLock.lock(); updateLock.lock();
try { try {
lastMsg = null; lastMsgs.clear();
update = new CountDownLatch(1); update = new CountDownLatch(count);
} finally { } finally {
updateLock.unlock(); updateLock.unlock();
} }
@ -111,6 +114,7 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
public void send(String text) throws NotYetConnectedException { public void send(String text) throws NotYetConnectedException {
updateLock.lock(); updateLock.lock();
try { try {
lastMsgs.clear();
reply = new CountDownLatch(1); reply = new CountDownLatch(1);
} finally { } finally {
updateLock.unlock(); updateLock.unlock();
@ -118,19 +122,19 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
super.send(text); super.send(text);
} }
public WsClient subscribeForTelemetry(List<UUID> devices, String key) { public WsClient subscribeForTelemetry(List<UUID> devices, List<String> keys) {
EntityDataCmd cmd = new EntityDataCmd(); EntityDataCmd cmd = new EntityDataCmd();
cmd.setCmdId(RandomUtils.nextInt(0, 1000)); cmd.setCmdId(RandomUtils.nextInt(0, 1000));
EntityListFilter devicesFilter = new EntityListFilter(); EntityListFilter devicesFilter = new EntityListFilter();
devicesFilter.setEntityType(EntityType.DEVICE); devicesFilter.setEntityType(EntityType.DEVICE);
devicesFilter.setEntityList(devices.stream().map(UUID::toString).collect(Collectors.toList())); devicesFilter.setEntityList(devices.stream().map(UUID::toString).collect(Collectors.toList()));
EntityDataPageLink pageLink = new EntityDataPageLink(100,0, null, null); EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, null);
EntityDataQuery devicesQuery = new EntityDataQuery(devicesFilter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); EntityDataQuery devicesQuery = new EntityDataQuery(devicesFilter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
cmd.setQuery(devicesQuery); cmd.setQuery(devicesQuery);
LatestValueCmd latestCmd = new LatestValueCmd(); LatestValueCmd latestCmd = new LatestValueCmd();
latestCmd.setKeys(List.of(new EntityKey(EntityKeyType.TIME_SERIES, key))); latestCmd.setKeys(keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).toList());
cmd.setLatestCmd(latestCmd); cmd.setLatestCmd(latestCmd);
CmdsWrapper wrapper = new CmdsWrapper(); CmdsWrapper wrapper = new CmdsWrapper();
@ -139,12 +143,12 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
return this; return this;
} }
public JsonNode waitForUpdate(long ms) { public List<JsonNode> waitForUpdates(long ms) {
log.trace("update latch count: {}", update.getCount()); log.trace("update latch count: {}", update.getCount());
try { try {
if (update.await(ms, TimeUnit.MILLISECONDS)) { if (update.await(ms, TimeUnit.MILLISECONDS)) {
log.trace("Waited for update"); log.trace("Waited for update");
return getLastMsg(); return getLastMsgs();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.debug("Failed to await reply", e); log.debug("Failed to await reply", e);
@ -157,7 +161,8 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
try { try {
if (reply.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { if (reply.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) {
log.trace("Waited for reply"); log.trace("Waited for reply");
return getLastMsg(); List<JsonNode> lastMsgs = getLastMsgs();
return lastMsgs.isEmpty() ? null : lastMsgs.get(0);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.debug("Failed to await reply", e); log.debug("Failed to await reply", e);
@ -166,24 +171,30 @@ public class WsClient extends WebSocketClient implements AutoCloseable {
throw new IllegalStateException("No WS reply arrived within " + requestTimeoutMs + " ms"); throw new IllegalStateException("No WS reply arrived within " + requestTimeoutMs + " ms");
} }
private JsonNode getLastMsg() { private List<JsonNode> getLastMsgs() {
if (lastMsg != null) { if (lastMsgs.isEmpty()) {
JsonNode errorMsg = lastMsg.get("errorMsg"); return lastMsgs;
if (errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText())) {
throw new RuntimeException("WS error from server: " + errorMsg.asText());
} else {
return lastMsg;
}
} else {
return null;
} }
List<JsonNode> errors = lastMsgs.stream()
.map(msg -> msg.get("errorMsg"))
.filter(errorMsg -> errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText()))
.toList();
if (!errors.isEmpty()) {
throw new RuntimeException("WS error from server: " + errors.stream()
.map(JsonNode::asText)
.collect(Collectors.joining(", ")));
}
return lastMsgs;
} }
public Object getTelemetryUpdate(UUID deviceId, String key) { public Map<String, String> getLatest(UUID deviceId) {
JsonNode lastMsg = getLastMsg(); Map<String, String> updates = new HashMap<>();
if (lastMsg == null || lastMsg.isNull()) return null; getLastMsgs().forEach(msg -> {
EntityDataUpdate update = JacksonUtil.treeToValue(lastMsg, EntityDataUpdate.class); EntityDataUpdate update = JacksonUtil.treeToValue(msg, EntityDataUpdate.class);
return update.getLatest(deviceId, key); Map<String, String> latest = update.getLatest(deviceId);
updates.putAll(latest);
});
return updates;
} }
@Override @Override

View File

@ -20,16 +20,15 @@ import lombok.Data;
@Data @Data
public class TransportInfo { public class TransportInfo {
private final TransportType transportType; private final TransportType type;
private final String baseUrl; private final TransportMonitoringTarget target;
private final String queue;
@Override @Override
public String toString() { public String toString() {
if (queue.equals("Main")) { if (target.getQueue().equals("Main")) {
return String.format("*%s* (%s)", transportType.getName(), baseUrl); return String.format("*%s* (%s)", type.getName(), target.getBaseUrl());
} else { } else {
return String.format("*%s* (%s) _%s_", transportType.getName(), baseUrl, queue); return String.format("*%s* (%s) _%s_", type.getName(), target.getBaseUrl(), target.getQueue());
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.monitoring.config.transport; package org.thingsboard.monitoring.config.transport;
import com.google.common.base.Strings;
import lombok.Data; import lombok.Data;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.thingsboard.monitoring.config.MonitoringTarget; import org.thingsboard.monitoring.config.MonitoringTarget;
@ -28,6 +29,7 @@ public class TransportMonitoringTarget implements MonitoringTarget {
private DeviceConfig device; // set manually during initialization private DeviceConfig device; // set manually during initialization
private String queue; private String queue;
private boolean checkDomainIps; private boolean checkDomainIps;
private String namePrefix;
@Override @Override
public UUID getDeviceId() { public UUID getDeviceId() {
@ -38,4 +40,8 @@ public class TransportMonitoringTarget implements MonitoringTarget {
return StringUtils.defaultIfEmpty(queue, "Main"); return StringUtils.defaultIfEmpty(queue, "Main");
} }
public String getNamePrefix() {
return Strings.nullToEmpty(namePrefix);
}
} }

View File

@ -15,14 +15,21 @@
*/ */
package org.thingsboard.monitoring.data; package org.thingsboard.monitoring.data;
import lombok.Getter;
@Getter
public class ServiceFailureException extends RuntimeException { public class ServiceFailureException extends RuntimeException {
public ServiceFailureException(Throwable cause) { private final Object serviceKey;
public ServiceFailureException(Object serviceKey, Throwable cause) {
super(cause.getMessage(), cause); super(cause.getMessage(), cause);
this.serviceKey = serviceKey;
} }
public ServiceFailureException(String message) { public ServiceFailureException(Object serviceKey, String message) {
super(message); super(message);
this.serviceKey = serviceKey;
} }
} }

View File

@ -19,9 +19,11 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data; import lombok.Data;
import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.TsValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
@Data @Data
@ -31,14 +33,16 @@ public class EntityDataUpdate {
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
private List<EntityData> update; private List<EntityData> update;
public String getLatest(UUID entityId, String key) { public Map<String, String> getLatest(UUID entityId) {
if (update == null) return null; if (update == null || update.isEmpty()) {
return Collections.emptyMap();
return update.stream() }
Map<String, String> result = new HashMap<>();
update.stream()
.filter(entityData -> entityData.getEntityId().getId().equals(entityId)).findFirst() .filter(entityData -> entityData.getEntityId().getId().equals(entityId)).findFirst()
.map(EntityData::getLatest).map(latest -> latest.get(EntityKeyType.TIME_SERIES)) .map(EntityData::getLatest).map(latest -> latest.get(EntityKeyType.TIME_SERIES))
.map(latest -> latest.get(key)).map(TsValue::getValue) .ifPresent(latest -> latest.forEach((key, tsValue) -> result.put(key, tsValue.getValue())));
.orElse(null); return result;
} }
} }

View File

@ -22,7 +22,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.client.WsClient; import org.thingsboard.monitoring.client.WsClient;
import org.thingsboard.monitoring.config.MonitoringConfig; import org.thingsboard.monitoring.config.MonitoringConfig;
import org.thingsboard.monitoring.config.MonitoringTarget; import org.thingsboard.monitoring.config.MonitoringTarget;
@ -46,6 +45,8 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
private Object info; private Object info;
@Autowired
protected MonitoringEntityService entityService;
@Autowired @Autowired
private MonitoringReporter reporter; private MonitoringReporter reporter;
@Autowired @Autowired
@ -57,18 +58,20 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
private final Map<String, BaseHealthChecker<C, T>> associates = new HashMap<>(); private final Map<String, BaseHealthChecker<C, T>> associates = new HashMap<>();
public static final String TEST_TELEMETRY_KEY = "testData"; public static final String TEST_TELEMETRY_KEY = "testData";
public static final String TEST_CF_TELEMETRY_KEY = "testDataCf";
@PostConstruct @PostConstruct
private void init() { private void init() {
info = getInfo(); info = getInfo();
} }
protected abstract void initialize(TbClient tbClient); protected abstract void initialize();
public final void check(WsClient wsClient) { public final void check(WsClient wsClient) {
log.debug("[{}] Checking", info); log.debug("[{}] Checking", info);
try { try {
wsClient.registerWaitForUpdate(); int expectedUpdatesCount = isCfMonitoringEnabled() ? 2 : 1;
wsClient.registerWaitForUpdates(expectedUpdatesCount);
String testValue = UUID.randomUUID().toString(); String testValue = UUID.randomUUID().toString();
String testPayload = createTestPayload(testValue); String testPayload = createTestPayload(testValue);
@ -79,16 +82,16 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
reporter.reportLatency(Latencies.request(getKey()), stopWatch.getTime()); reporter.reportLatency(Latencies.request(getKey()), stopWatch.getTime());
log.trace("[{}] Sent test payload ({})", info, testPayload); log.trace("[{}] Sent test payload ({})", info, testPayload);
} catch (Throwable e) { } catch (Throwable e) {
throw new ServiceFailureException(e); throw new ServiceFailureException(info, e);
} }
log.trace("[{}] Waiting for WS update", info); log.trace("[{}] Waiting for WS update", info);
checkWsUpdate(wsClient, testValue); checkWsUpdates(wsClient, testValue);
reporter.serviceIsOk(info); reporter.serviceIsOk(info);
reporter.serviceIsOk(MonitoredServiceKey.GENERAL); reporter.serviceIsOk(MonitoredServiceKey.GENERAL);
} catch (ServiceFailureException serviceFailureException) { } catch (ServiceFailureException e) {
reporter.serviceFailure(info, serviceFailureException); reporter.serviceFailure(e.getServiceKey(), e);
} catch (Exception e) { } catch (Exception e) {
reporter.serviceFailure(MonitoredServiceKey.GENERAL, e); reporter.serviceFailure(MonitoredServiceKey.GENERAL, e);
} }
@ -98,15 +101,26 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
}); });
} }
private void checkWsUpdate(WsClient wsClient, String testValue) { private void checkWsUpdates(WsClient wsClient, String testValue) {
stopWatch.start(); stopWatch.start();
wsClient.waitForUpdate(resultCheckTimeoutMs); wsClient.waitForUpdates(resultCheckTimeoutMs);
log.trace("[{}] Waited for WS update. Last WS msg: {}", info, wsClient.lastMsg); log.trace("[{}] Waited for WS update. Last WS msgs: {}", info, wsClient.lastMsgs);
Object update = wsClient.getTelemetryUpdate(target.getDeviceId(), TEST_TELEMETRY_KEY); Map<String, String> latest = wsClient.getLatest(target.getDeviceId());
if (update == null) { if (latest.isEmpty()) {
throw new ServiceFailureException("No WS update arrived within " + resultCheckTimeoutMs + " ms"); throw new ServiceFailureException(info, "No WS update arrived within " + resultCheckTimeoutMs + " ms");
} else if (!update.toString().equals(testValue)) { }
throw new ServiceFailureException("Was expecting value " + testValue + " but got " + update); String actualValue = latest.get(TEST_TELEMETRY_KEY);
if (!testValue.equals(actualValue)) {
throw new ServiceFailureException(info, "Was expecting value " + testValue + " but got " + actualValue);
}
if (isCfMonitoringEnabled()) {
String cfTestValue = testValue + "-cf";
String actualCfValue = latest.get(TEST_CF_TELEMETRY_KEY);
if (actualCfValue == null) {
throw new ServiceFailureException(info, "No calculated field value arrived");
} else if (!cfTestValue.equals(actualCfValue)) {
throw new ServiceFailureException(info, "Was expecting calculated field value " + cfTestValue + " but got " + actualCfValue);
}
} }
reporter.reportLatency(Latencies.wsUpdate(getKey()), stopWatch.getTime()); reporter.reportLatency(Latencies.wsUpdate(getKey()), stopWatch.getTime());
} }
@ -121,6 +135,9 @@ public abstract class BaseHealthChecker<C extends MonitoringConfig, T extends Mo
protected abstract void destroyClient() throws Exception; protected abstract void destroyClient() throws Exception;
protected abstract Object getInfo(); protected abstract Object getInfo();
protected abstract String getKey(); protected abstract String getKey();
protected abstract boolean isCfMonitoringEnabled();
} }

View File

@ -16,7 +16,6 @@
package org.thingsboard.monitoring.service; package org.thingsboard.monitoring.service;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import jakarta.annotation.PostConstruct;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -31,7 +30,6 @@ import org.thingsboard.monitoring.config.MonitoringTarget;
import org.thingsboard.monitoring.data.Latencies; import org.thingsboard.monitoring.data.Latencies;
import org.thingsboard.monitoring.data.MonitoredServiceKey; import org.thingsboard.monitoring.data.MonitoredServiceKey;
import org.thingsboard.monitoring.data.ServiceFailureException; import org.thingsboard.monitoring.data.ServiceFailureException;
import org.thingsboard.monitoring.service.transport.TransportHealthChecker;
import org.thingsboard.monitoring.util.TbStopWatch; import org.thingsboard.monitoring.util.TbStopWatch;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
@ -59,6 +57,9 @@ import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.thingsboard.monitoring.service.BaseHealthChecker.TEST_CF_TELEMETRY_KEY;
import static org.thingsboard.monitoring.service.BaseHealthChecker.TEST_TELEMETRY_KEY;
@Slf4j @Slf4j
public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T extends MonitoringTarget> { public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T extends MonitoringTarget> {
@ -79,14 +80,15 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
protected ApplicationContext applicationContext; protected ApplicationContext applicationContext;
@Value("${monitoring.edqs.enabled:false}") @Value("${monitoring.edqs.enabled:false}")
private boolean edqsMonitoringEnabled; private boolean checkEdqs;
@Value("${monitoring.calculated_fields.enabled:true}")
protected boolean checkCalculatedFields;
@PostConstruct public void init() {
private void init() {
if (configs == null || configs.isEmpty()) { if (configs == null || configs.isEmpty()) {
return; return;
} }
tbClient.logIn();
configs.forEach(config -> { configs.forEach(config -> {
config.getTargets().forEach(target -> { config.getTargets().forEach(target -> {
BaseHealthChecker<C, T> healthChecker = initHealthChecker(target, config); BaseHealthChecker<C, T> healthChecker = initHealthChecker(target, config);
@ -104,7 +106,7 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
private BaseHealthChecker<C, T> initHealthChecker(T target, C config) { private BaseHealthChecker<C, T> initHealthChecker(T target, C config) {
BaseHealthChecker<C, T> healthChecker = (BaseHealthChecker<C, T>) createHealthChecker(config, target); BaseHealthChecker<C, T> healthChecker = (BaseHealthChecker<C, T>) createHealthChecker(config, target);
log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl()); log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl());
healthChecker.initialize(tbClient); healthChecker.initialize();
devices.add(target.getDeviceId()); devices.add(target.getDeviceId());
return healthChecker; return healthChecker;
} }
@ -121,7 +123,7 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
try (WsClient wsClient = wsClientFactory.createClient(accessToken)) { try (WsClient wsClient = wsClientFactory.createClient(accessToken)) {
stopWatch.start(); stopWatch.start();
wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply(); wsClient.subscribeForTelemetry(devices, getTestTelemetryKeys()).waitForReply();
reporter.reportLatency(Latencies.WS_SUBSCRIBE, stopWatch.getTime()); reporter.reportLatency(Latencies.WS_SUBSCRIBE, stopWatch.getTime());
for (BaseHealthChecker<C, T> healthChecker : healthCheckers) { for (BaseHealthChecker<C, T> healthChecker : healthCheckers) {
@ -129,22 +131,17 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
} }
} }
if (edqsMonitoringEnabled) { if (checkEdqs) {
try { stopWatch.start();
stopWatch.start(); checkEdqs();
checkEdqs(); reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime());
reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime()); reporter.serviceIsOk(MonitoredServiceKey.EDQS);
reporter.serviceIsOk(MonitoredServiceKey.EDQS);
} catch (ServiceFailureException e) {
reporter.serviceFailure(MonitoredServiceKey.EDQS, e);
} catch (Exception e) {
reporter.serviceFailure(MonitoredServiceKey.GENERAL, e);
}
} }
reporter.reportLatencies(tbClient); reporter.reportLatencies();
log.debug("Finished {}", getName()); log.debug("Finished {}", getName());
} catch (ServiceFailureException e) {
reporter.serviceFailure(e.getServiceKey(), e);
} catch (Throwable error) { } catch (Throwable error) {
try { try {
reporter.serviceFailure(MonitoredServiceKey.GENERAL, error); reporter.serviceFailure(MonitoredServiceKey.GENERAL, error);
@ -199,7 +196,7 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
.collect(Collectors.toSet()); .collect(Collectors.toSet());
Set<UUID> missing = Sets.difference(new HashSet<>(this.devices), devices); Set<UUID> missing = Sets.difference(new HashSet<>(this.devices), devices);
if (!missing.isEmpty()) { if (!missing.isEmpty()) {
throw new ServiceFailureException("Missing devices in the response: " + missing); throw new ServiceFailureException(MonitoredServiceKey.EDQS, "Missing devices in the response: " + missing);
} }
result.getData().stream() result.getData().stream()
@ -211,7 +208,7 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
Stream.of("name", "type", "testData").forEach(key -> { Stream.of("name", "type", "testData").forEach(key -> {
TsValue value = values.get(key); TsValue value = values.get(key);
if (value == null || StringUtils.isBlank(value.getValue())) { if (value == null || StringUtils.isBlank(value.getValue())) {
throw new ServiceFailureException("Missing " + key + " for device " + entityData.getEntityId()); throw new ServiceFailureException(MonitoredServiceKey.EDQS, "Missing " + key + " for device " + entityData.getEntityId());
} }
}); });
}); });
@ -232,6 +229,10 @@ public abstract class BaseMonitoringService<C extends MonitoringConfig<T>, T ext
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }
private List<String> getTestTelemetryKeys() {
return checkCalculatedFields ? List.of(TEST_TELEMETRY_KEY, TEST_CF_TELEMETRY_KEY) : List.of(TEST_TELEMETRY_KEY);
}
private void stopHealthChecker(BaseHealthChecker<C, T> healthChecker) throws Exception { private void stopHealthChecker(BaseHealthChecker<C, T> healthChecker) throws Exception {
healthChecker.destroyClient(); healthChecker.destroyClient();
devices.remove(healthChecker.getTarget().getDeviceId()); devices.remove(healthChecker.getTarget().getDeviceId());

View File

@ -0,0 +1,252 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.monitoring.service;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.RegexUtils;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.util.ResourceUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecBootstrapClientCredential;
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredential;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.thingsboard.monitoring.service.BaseHealthChecker.TEST_CF_TELEMETRY_KEY;
import static org.thingsboard.monitoring.service.BaseHealthChecker.TEST_TELEMETRY_KEY;
@Service
@Slf4j
@RequiredArgsConstructor
public class MonitoringEntityService {
private final TbClient tbClient;
@Value("${monitoring.calculated_fields.enabled:true}")
private boolean calculatedFieldsMonitoringEnabled;
public void checkEntities() {
RuleChain ruleChain = tbClient.getRuleChains(RuleChainType.CORE, new PageLink(10)).getData().stream()
.filter(RuleChain::isRoot)
.findFirst().orElseThrow();
RuleChainId ruleChainId = ruleChain.getId();
JsonNode ruleChainDescriptor = ResourceUtils.getResource("rule_chain.json");
List<String> attributeKeys = tbClient.getAttributeKeys(ruleChainId);
Map<String, String> attributes = tbClient.getAttributeKvEntries(ruleChainId, attributeKeys).stream()
.collect(Collectors.toMap(KvEntry::getKey, KvEntry::getValueAsString));
int currentVersion = Integer.parseInt(attributes.getOrDefault("version", "0"));
int newVersion = ruleChainDescriptor.get("version").asInt();
if (currentVersion == newVersion) {
log.info("Not updating rule chain, version is the same ({})", currentVersion);
return;
} else {
log.info("Updating rule chain '{}' from version {} to {}", ruleChain.getName(), currentVersion, newVersion);
}
String metadataJson = RegexUtils.replace(ruleChainDescriptor.get("metadata").toString(),
"\\$\\{MONITORING:(.+?)}", matchResult -> {
String key = matchResult.group(1);
String value = attributes.get(key);
if (value == null) {
throw new IllegalArgumentException("No attribute found for key " + key);
}
log.info("Using {}: {}", key, value);
return value;
});
RuleChainMetaData metaData = JacksonUtil.fromString(metadataJson, RuleChainMetaData.class);
metaData.setRuleChainId(ruleChainId);
tbClient.saveRuleChainMetaData(metaData);
tbClient.saveEntityAttributesV2(ruleChainId, DataConstants.SERVER_SCOPE, JacksonUtil.newObjectNode()
.put("version", newVersion));
}
public Asset getOrCreateMonitoringAsset() {
String assetName = "[Monitoring] Latencies";
return tbClient.findAsset(assetName).orElseGet(() -> {
Asset asset = new Asset();
asset.setType("Monitoring");
asset.setName(assetName);
asset = tbClient.saveAsset(asset);
log.info("Created monitoring asset {}", asset.getId());
return asset;
});
}
public void checkEntities(TransportMonitoringConfig config, TransportMonitoringTarget target) {
Device device = getOrCreateDevice(config, target);
DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId())
.orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId()));
DeviceConfig deviceConfig = new DeviceConfig();
deviceConfig.setId(device.getId().toString());
deviceConfig.setName(device.getName());
deviceConfig.setCredentials(credentials);
target.setDevice(deviceConfig);
}
private Device getOrCreateDevice(TransportMonitoringConfig config, TransportMonitoringTarget target) {
TransportType transportType = config.getTransportType();
String deviceName = String.format("%s %s (%s) - %s", target.getNamePrefix(), transportType.getName(), target.getQueue(), target.getBaseUrl()).trim();
Device device = tbClient.getTenantDevice(deviceName).orElse(null);
if (device != null) {
if (calculatedFieldsMonitoringEnabled) {
CalculatedField calculatedField = tbClient.getCalculatedFieldsByEntityId(device.getId(), new PageLink(1, 0, TEST_CF_TELEMETRY_KEY))
.getData().stream().findFirst().orElse(null);
if (calculatedField == null) {
createCalculatedField(device);
}
}
return device;
}
log.info("Creating new device '{}'", deviceName);
device = new Device();
device.setName(deviceName);
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20));
DeviceData deviceData = new DeviceData();
deviceData.setConfiguration(new DefaultDeviceConfiguration());
DeviceProfile deviceProfile = getOrCreateDeviceProfile(config, target);
device.setType(deviceProfile.getName());
device.setDeviceProfileId(deviceProfile.getId());
if (transportType != TransportType.LWM2M) {
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
} else {
deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration());
credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials();
NoSecClientCredential client = new NoSecClientCredential();
client.setEndpoint(credentials.getCredentialsId());
lwm2mCreds.setClient(client);
LwM2MBootstrapClientCredentials bootstrap = new LwM2MBootstrapClientCredentials();
bootstrap.setBootstrapServer(new NoSecBootstrapClientCredential());
bootstrap.setLwm2mServer(new NoSecBootstrapClientCredential());
lwm2mCreds.setBootstrap(bootstrap);
credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds));
}
device = tbClient.saveDeviceWithCredentials(device, credentials).get();
if (calculatedFieldsMonitoringEnabled) {
createCalculatedField(device);
}
return device;
}
private DeviceProfile getOrCreateDeviceProfile(TransportMonitoringConfig config, TransportMonitoringTarget target) {
TransportType transportType = config.getTransportType();
String profileName = String.format("%s %s (%s)", target.getNamePrefix(), transportType.getName(), target.getQueue()).trim();
DeviceProfile deviceProfile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData()
.stream().findFirst().orElse(null);
if (deviceProfile != null) {
return deviceProfile;
}
log.info("Creating new device profile '{}'", profileName);
if (transportType != TransportType.LWM2M) {
deviceProfile = new DeviceProfile();
deviceProfile.setType(DeviceProfileType.DEFAULT);
deviceProfile.setTransportType(DeviceTransportType.DEFAULT);
DeviceProfileData profileData = new DeviceProfileData();
profileData.setConfiguration(new DefaultDeviceProfileConfiguration());
profileData.setTransportConfiguration(new DefaultDeviceProfileTransportConfiguration());
deviceProfile.setProfileData(profileData);
} else {
tbClient.getResources(new PageLink(1, 0, "LwM2M Monitoring")).getData()
.stream().findFirst()
.orElseGet(() -> {
TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class);
log.info("Creating LwM2M resource");
return tbClient.saveResource(newResource);
});
deviceProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class);
}
deviceProfile.setName(profileName);
deviceProfile.setDefaultQueueName(target.getQueue());
return tbClient.saveDeviceProfile(deviceProfile);
}
private void createCalculatedField(Device device) {
log.info("Creating calculated field for device '{}'", device.getName());
CalculatedField calculatedField = new CalculatedField();
calculatedField.setName(TEST_CF_TELEMETRY_KEY);
calculatedField.setEntityId(device.getId());
calculatedField.setType(CalculatedFieldType.SCRIPT);
ScriptCalculatedFieldConfiguration configuration = new ScriptCalculatedFieldConfiguration();
Argument testDataArgument = new Argument();
testDataArgument.setRefEntityKey(new ReferencedEntityKey(TEST_TELEMETRY_KEY, ArgumentType.TS_LATEST, null));
configuration.setArguments(Map.of(
TEST_TELEMETRY_KEY, testDataArgument
));
configuration.setExpression("return { \"" + TEST_CF_TELEMETRY_KEY + "\": " + TEST_TELEMETRY_KEY + " + \"-cf\" };");
Output output = new Output();
output.setType(OutputType.TIME_SERIES);
configuration.setOutput(output);
calculatedField.setConfiguration(configuration);
calculatedField.setDebugMode(true);
tbClient.saveCalculatedField(calculatedField);
}
}

View File

@ -46,6 +46,8 @@ import java.util.stream.Collectors;
public class MonitoringReporter { public class MonitoringReporter {
private final NotificationService notificationService; private final NotificationService notificationService;
private final TbClient tbClient;
private final MonitoringEntityService entityService;
private final Map<String, Latency> latencies = new ConcurrentHashMap<>(); private final Map<String, Latency> latencies = new ConcurrentHashMap<>();
private final Map<Object, AtomicInteger> failuresCounters = new ConcurrentHashMap<>(); private final Map<Object, AtomicInteger> failuresCounters = new ConcurrentHashMap<>();
@ -62,7 +64,7 @@ public class MonitoringReporter {
@Value("${monitoring.latency.reporting_asset_id}") @Value("${monitoring.latency.reporting_asset_id}")
private String reportingAssetId; private String reportingAssetId;
public void reportLatencies(TbClient tbClient) { public void reportLatencies() {
if (latencies.isEmpty()) { if (latencies.isEmpty()) {
return; return;
} }
@ -81,15 +83,7 @@ public class MonitoringReporter {
try { try {
if (StringUtils.isBlank(reportingAssetId)) { if (StringUtils.isBlank(reportingAssetId)) {
String assetName = "[Monitoring] Latencies"; Asset monitoringAsset = entityService.getOrCreateMonitoringAsset();
Asset monitoringAsset = tbClient.findAsset(assetName).orElseGet(() -> {
Asset asset = new Asset();
asset.setType("Monitoring");
asset.setName(assetName);
asset = tbClient.saveAsset(asset);
log.info("Created monitoring asset {}", asset.getId());
return asset;
});
reportingAssetId = monitoringAsset.getId().toString(); reportingAssetId = monitoringAsset.getId().toString();
} }
@ -113,7 +107,7 @@ public class MonitoringReporter {
public void serviceFailure(Object serviceKey, Throwable error) { public void serviceFailure(Object serviceKey, Throwable error) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.error("Error occurred", error); log.error("[{}] Error occurred", serviceKey, error);
} }
int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet(); int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet();
ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount); ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount);

View File

@ -17,54 +17,27 @@ package org.thingsboard.monitoring.service.transport;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.monitoring.client.TbClient;
import org.thingsboard.monitoring.config.transport.DeviceConfig;
import org.thingsboard.monitoring.config.transport.TransportInfo; import org.thingsboard.monitoring.config.transport.TransportInfo;
import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig;
import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget;
import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.config.transport.TransportType;
import org.thingsboard.monitoring.service.BaseHealthChecker; import org.thingsboard.monitoring.service.BaseHealthChecker;
import org.thingsboard.monitoring.util.ResourceUtils;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecBootstrapClientCredential;
import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredential;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
@Slf4j @Slf4j
public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> { public abstract class TransportHealthChecker<C extends TransportMonitoringConfig> extends BaseHealthChecker<C, TransportMonitoringTarget> {
@Value("${monitoring.calculated_fields.enabled:true}")
private boolean calculatedFieldsMonitoringEnabled;
public TransportHealthChecker(C config, TransportMonitoringTarget target) { public TransportHealthChecker(C config, TransportMonitoringTarget target) {
super(config, target); super(config, target);
} }
@Override @Override
protected void initialize(TbClient tbClient) { protected void initialize() {
Device device = getOrCreateDevice(tbClient); entityService.checkEntities(config, target);
DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId())
.orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId()));
DeviceConfig deviceConfig = new DeviceConfig();
deviceConfig.setId(device.getId().toString());
deviceConfig.setName(device.getName());
deviceConfig.setCredentials(credentials);
target.setDevice(deviceConfig);
} }
@Override @Override
@ -74,7 +47,7 @@ public abstract class TransportHealthChecker<C extends TransportMonitoringConfig
@Override @Override
protected Object getInfo() { protected Object getInfo() {
return new TransportInfo(getTransportType(), target.getBaseUrl(), target.getQueue()); return new TransportInfo(getTransportType(), target);
} }
@Override @Override
@ -84,80 +57,9 @@ public abstract class TransportHealthChecker<C extends TransportMonitoringConfig
protected abstract TransportType getTransportType(); protected abstract TransportType getTransportType();
@Override
private Device getOrCreateDevice(TbClient tbClient) { protected boolean isCfMonitoringEnabled() {
TransportType transportType = config.getTransportType(); return calculatedFieldsMonitoringEnabled;
String deviceName = String.format("%s (%s) - %s", transportType.getName(), target.getQueue(), target.getBaseUrl());
Device device = tbClient.getTenantDevice(deviceName).orElse(null);
if (device != null) {
return device;
}
log.info("Creating new device '{}'", deviceName);
device = new Device();
device.setName(deviceName);
DeviceCredentials credentials = new DeviceCredentials();
credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20));
DeviceData deviceData = new DeviceData();
deviceData.setConfiguration(new DefaultDeviceConfiguration());
DeviceProfile deviceProfile = getOrCreateDeviceProfile(tbClient);
device.setType(deviceProfile.getName());
device.setDeviceProfileId(deviceProfile.getId());
if (transportType != TransportType.LWM2M) {
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
} else {
deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration());
credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS);
LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials();
NoSecClientCredential client = new NoSecClientCredential();
client.setEndpoint(credentials.getCredentialsId());
lwm2mCreds.setClient(client);
LwM2MBootstrapClientCredentials bootstrap = new LwM2MBootstrapClientCredentials();
bootstrap.setBootstrapServer(new NoSecBootstrapClientCredential());
bootstrap.setLwm2mServer(new NoSecBootstrapClientCredential());
lwm2mCreds.setBootstrap(bootstrap);
credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds));
}
return tbClient.saveDeviceWithCredentials(device, credentials).get();
}
private DeviceProfile getOrCreateDeviceProfile(TbClient tbClient) {
TransportType transportType = config.getTransportType();
String profileName = String.format("%s (%s)", transportType.getName(), target.getQueue());
DeviceProfile deviceProfile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData()
.stream().findFirst().orElse(null);
if (deviceProfile != null) {
return deviceProfile;
}
log.info("Creating new device profile '{}'", profileName);
if (transportType != TransportType.LWM2M) {
deviceProfile = new DeviceProfile();
deviceProfile.setType(DeviceProfileType.DEFAULT);
deviceProfile.setTransportType(DeviceTransportType.DEFAULT);
DeviceProfileData profileData = new DeviceProfileData();
profileData.setConfiguration(new DefaultDeviceProfileConfiguration());
profileData.setTransportConfiguration(new DefaultDeviceProfileTransportConfiguration());
deviceProfile.setProfileData(profileData);
} else {
tbClient.getResources(new PageLink(1, 0, "LwM2M Monitoring")).getData()
.stream().findFirst()
.orElseGet(() -> {
TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class);
log.info("Creating LwM2M resource");
return tbClient.saveResource(newResource);
});
deviceProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class);
}
deviceProfile.setName(profileName);
deviceProfile.setDefaultQueueName(target.getQueue());
return tbClient.saveDeviceProfile(deviceProfile);
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.monitoring.util; package org.thingsboard.monitoring.util;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
@ -24,14 +25,21 @@ public class ResourceUtils {
@SneakyThrows @SneakyThrows
public static <T> T getResource(String path, Class<T> type) { public static <T> T getResource(String path, Class<T> type) {
InputStream resource = ResourceUtils.class.getClassLoader().getResourceAsStream(path); InputStream resource = getResourceStream(path);
if (resource == null) {
throw new IllegalArgumentException("Resource not found for path " + path);
}
return JacksonUtil.OBJECT_MAPPER.readValue(resource, type); return JacksonUtil.OBJECT_MAPPER.readValue(resource, type);
} }
@SneakyThrows
public static JsonNode getResource(String path) {
InputStream resource = getResourceStream(path);
return JacksonUtil.OBJECT_MAPPER.readTree(resource);
}
public static InputStream getResourceAsStream(String path) { public static InputStream getResourceAsStream(String path) {
return getResourceStream(path);
}
private static InputStream getResourceStream(String path) {
InputStream resource = ResourceUtils.class.getClassLoader().getResourceAsStream(path); InputStream resource = ResourceUtils.class.getClassLoader().getResourceAsStream(path);
if (resource == null) { if (resource == null) {
throw new IllegalArgumentException("Resource not found for path " + path); throw new IllegalArgumentException("Resource not found for path " + path);

View File

@ -1,257 +1,232 @@
{ {
"version": 1,
"ruleChain": { "ruleChain": {
"additionalInfo": null,
"name": "Root Rule Chain", "name": "Root Rule Chain",
"type": "CORE", "type": "CORE",
"firstRuleNodeId": null, "firstRuleNodeId": null,
"root": false, "root": false,
"debugMode": false, "debugMode": false,
"configuration": null, "configuration": null,
"externalId": null "additionalInfo": null
}, },
"metadata": { "metadata": {
"firstNodeIndex": 12, "firstNodeIndex": 9,
"nodes": [ "nodes": [
{ {
"additionalInfo": {
"description": null,
"layoutX": 1202,
"layoutY": 221
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries", "name": "Save Timeseries",
"debugSettings": {
"failuresEnabled": true,
"allEnabled": false,
"allEnabledUntil": 1735310701003
},
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 1, "configurationVersion": 1,
"configuration": { "configuration": {
"defaultTTL": 0, "defaultTTL": 0,
"useServerTs": false,
"processingSettings": { "processingSettings": {
"type": "ON_EVERY_MESSAGE" "type": "ON_EVERY_MESSAGE"
} }
}, },
"externalId": null "additionalInfo": {
"description": null,
"layoutX": 1202,
"layoutY": 221
}
}, },
{ {
"additionalInfo": {
"layoutX": 1000,
"layoutY": 167
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", "type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"name": "Save Attributes", "name": "Save Attributes",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 3, "configurationVersion": 3,
"configuration": { "configuration": {
"scope": "CLIENT_SCOPE",
"notifyDevice": false,
"processingSettings": { "processingSettings": {
"type": "ON_EVERY_MESSAGE" "type": "ON_EVERY_MESSAGE"
}, },
"scope": "CLIENT_SCOPE",
"notifyDevice": false,
"sendAttributesUpdatedNotification": false, "sendAttributesUpdatedNotification": false,
"updateAttributesOnlyOnValueChange": false "updateAttributesOnlyOnValueChange": false
}, },
"externalId": null "additionalInfo": {
"layoutX": 1000,
"layoutY": 167
}
}, },
{ {
"additionalInfo": {
"layoutX": 566,
"layoutY": 302
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "Message Type Switch", "name": "Message Type Switch",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"version": 0 "version": 0
}, },
"externalId": null "additionalInfo": {
"layoutX": 566,
"layoutY": 302
}
}, },
{ {
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log RPC from Device",
"debugSettings": null,
"singletonMode": false,
"queueName": null,
"configurationVersion": 0,
"configuration": {
"scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
"tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
},
"additionalInfo": { "additionalInfo": {
"layoutX": 1000, "layoutX": 1000,
"layoutY": 381 "layoutY": 381
}, }
},
{
"type": "org.thingsboard.rule.engine.action.TbLogNode", "type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log RPC from Device", "name": "Log Other",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"scriptLang": "TBEL", "scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
"tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" "tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
}, },
"externalId": null
},
{
"additionalInfo": { "additionalInfo": {
"layoutX": 1000, "layoutX": 1000,
"layoutY": 494 "layoutY": 494
}, }
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log Other",
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"scriptLang": "TBEL",
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);",
"tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
},
"externalId": null
}, },
{ {
"additionalInfo": {
"layoutX": 1000,
"layoutY": 583
},
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"name": "RPC Call Request", "name": "RPC Call Request",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"timeoutInSeconds": 60 "timeoutInSeconds": 60
}, },
"externalId": null "additionalInfo": {
"layoutX": 1000,
"layoutY": 583
}
}, },
{ {
"additionalInfo": {
"layoutX": 255,
"layoutY": 301
},
"type": "org.thingsboard.rule.engine.filter.TbOriginatorTypeFilterNode",
"name": "Is Entity Group",
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"originatorTypes": [
"ENTITY_GROUP"
]
},
"externalId": null
},
{
"additionalInfo": {
"layoutX": 319,
"layoutY": 109
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode",
"name": "Post attributes or RPC request",
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"messageTypes": [
"POST_ATTRIBUTES_REQUEST",
"RPC_CALL_FROM_SERVER_TO_DEVICE"
]
},
"externalId": null
},
{
"additionalInfo": {
"layoutX": 627,
"layoutY": 108
},
"type": "org.thingsboard.rule.engine.transform.TbDuplicateMsgToGroupNode",
"name": "Duplicate To Group Entities",
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"entityGroupId": null,
"entityGroupIsMessageOriginator": true
},
"externalId": null
},
{
"additionalInfo": {
"description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.",
"layoutX": 45,
"layoutY": 359
},
"type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"name": "Device Profile Node", "name": "Device Profile Node",
"debugSettings": {
"failuresEnabled": true,
"allEnabled": false,
"allEnabledUntil": 1735310701003
},
"singletonMode": false, "singletonMode": false,
"configurationVersion": 0, "queueName": null,
"configurationVersion": 1,
"configuration": { "configuration": {
"persistAlarmRulesState": false, "persistAlarmRulesState": false,
"fetchAlarmRulesStateOnStart": false "fetchAlarmRulesStateOnStart": false
}, },
"externalId": null "additionalInfo": {
"description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.",
"layoutX": 45,
"layoutY": 359
}
}, },
{ {
"additionalInfo": {
"description": "",
"layoutX": 160,
"layoutY": 631
},
"type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode",
"name": "Test JS script", "name": "Test JS script",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"scriptLang": "JS", "scriptLang": "JS",
"jsScript": "var test = {\n a: 'a',\n b: 'b'\n};\nreturn test.a === 'a' && test.b === 'b';", "jsScript": "var test = {\n a: 'a',\n b: 'b'\n};\n\nreturn test.a === 'a' && test.b === 'b';",
"tbelScript": "return msg.temperature > 20;" "tbelScript": "return msg.temperature > 20;"
}, },
"externalId": null
},
{
"additionalInfo": { "additionalInfo": {
"description": "", "description": "",
"layoutX": 427, "layoutX": 251,
"layoutY": 541 "layoutY": 499
}, }
},
{
"type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode",
"name": "Test TBEL script", "name": "Test TBEL script",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"scriptLang": "TBEL", "scriptLang": "TBEL",
"jsScript": "return msg.temperature > 20;", "jsScript": "return msg.temperature > 20;",
"tbelScript": "var a = \"a\";\nvar b = \"b\";\nreturn a.equals(\"a\") && b.equals(\"b\");" "tbelScript": "var a = \"a\";\nvar b = \"b\";\nreturn a.equals(\"a\") && b.equals(\"b\");"
}, },
"externalId": null
},
{
"additionalInfo": { "additionalInfo": {
"description": "", "description": "",
"layoutX": 40, "layoutX": 317,
"layoutY": 252 "layoutY": 355
}, }
},
{
"type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode",
"name": "Add arrival timestamp", "name": "Add arrival timestamp",
"debugSettings": {
"failuresEnabled": true,
"allEnabled": false,
"allEnabledUntil": 1744642101587
},
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"scriptLang": "TBEL", "scriptLang": "TBEL",
"jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};",
"tbelScript": "metadata.arrivalTs = Date.now();\nreturn {msg: msg, metadata: metadata, msgType: msgType};" "tbelScript": "metadata.arrivalTs = Date.now();\nreturn {msg: msg, metadata: metadata, msgType: msgType};"
}, },
"externalId": null
},
{
"additionalInfo": { "additionalInfo": {
"description": "", "description": "",
"layoutX": 1467, "layoutX": 40,
"layoutY": 267 "layoutY": 252
}, }
},
{
"type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode",
"name": "Calculate additional latencies", "name": "Calculate additional latencies",
"debugSettings": {
"failuresEnabled": true,
"allEnabled": false,
"allEnabledUntil": 1735310701003
},
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"scriptLang": "TBEL", "scriptLang": "TBEL",
"jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};",
"tbelScript": "var arrivalLatency = metadata.arrivalTs - metadata.ts;\nvar processingTime = Date.now() - metadata.arrivalTs;\nmsg = {\n arrivalLatency: arrivalLatency,\n processingTime: processingTime\n};\nreturn {msg: msg, metadata: metadata, msgType: msgType};" "tbelScript": "var arrivalLatency = metadata.arrivalTs - metadata.ts;\nvar processingTime = Date.now() - metadata.arrivalTs;\nmsg = {\n arrivalLatency: arrivalLatency,\n processingTime: processingTime\n};\nreturn {msg: msg, metadata: metadata, msgType: msgType};"
}, },
"externalId": null
},
{
"additionalInfo": { "additionalInfo": {
"description": "", "description": "",
"layoutX": 1438, "layoutX": 1467,
"layoutY": 403 "layoutY": 267
}, }
},
{
"type": "org.thingsboard.rule.engine.transform.TbChangeOriginatorNode", "type": "org.thingsboard.rule.engine.transform.TbChangeOriginatorNode",
"name": "To latencies asset", "name": "To latencies asset",
"debugSettings": null,
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 0, "configurationVersion": 0,
"configuration": { "configuration": {
"originatorSource": "ENTITY", "originatorSource": "ENTITY",
@ -269,64 +244,79 @@
"fetchLastLevelOnly": false "fetchLastLevelOnly": false
} }
}, },
"externalId": null "additionalInfo": {
"description": "",
"layoutX": 1438,
"layoutY": 403
}
}, },
{ {
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"debugSettings": {
"failuresEnabled": true,
"allEnabled": false,
"allEnabledUntil": 1735310701003
},
"singletonMode": false,
"queueName": null,
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0,
"processingSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"additionalInfo": { "additionalInfo": {
"description": null, "description": null,
"layoutX": 1458, "layoutX": 1458,
"layoutY": 505 "layoutY": 505
}, }
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"singletonMode": false,
"configurationVersion": 1,
"configuration": {
"defaultTTL": 0,
"useServerTs": false,
"processingSettings": {
"type": "ON_EVERY_MESSAGE"
}
},
"externalId": null
}, },
{ {
"type": "org.thingsboard.rule.engine.filter.TbCheckMessageNode",
"name": "Has testData",
"debugSettings": null,
"singletonMode": false,
"queueName": null,
"configurationVersion": 0,
"configuration": {
"messageNames": [
"testData",
"testDataCf"
],
"metadataNames": [],
"checkAllKeys": false
},
"additionalInfo": { "additionalInfo": {
"description": "", "description": "",
"layoutX": 928, "layoutX": 928,
"layoutY": 266 "layoutY": 266
}, }
"type": "org.thingsboard.rule.engine.filter.TbCheckMessageNode",
"name": "Has testData",
"singletonMode": false,
"configurationVersion": 0,
"configuration": {
"messageNames": [
"testData"
],
"metadataNames": [],
"checkAllKeys": true
},
"externalId": null
}, },
{ {
"additionalInfo": {
"description": null,
"layoutX": 1203,
"layoutY": 327
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries with TTL", "name": "Save Timeseries with TTL",
"debugSettings": {
"failuresEnabled": true,
"allEnabled": false,
"allEnabledUntil": 1742305233839
},
"singletonMode": false, "singletonMode": false,
"queueName": null,
"configurationVersion": 1, "configurationVersion": 1,
"configuration": { "configuration": {
"defaultTTL": 180,
"useServerTs": false,
"processingSettings": { "processingSettings": {
"type": "ON_EVERY_MESSAGE" "type": "ON_EVERY_MESSAGE"
} },
"defaultTTL": 60,
"useServerTs": null
}, },
"externalId": null "additionalInfo": {
"description": "",
"layoutX": 1203,
"layoutY": 327
}
} }
], ],
"connections": [ "connections": [
@ -352,23 +342,13 @@
}, },
{ {
"fromIndex": 2, "fromIndex": 2,
"toIndex": 16, "toIndex": 13,
"type": "Post telemetry" "type": "Post telemetry"
}, },
{
"fromIndex": 6,
"toIndex": 2,
"type": "False"
},
{ {
"fromIndex": 6, "fromIndex": 6,
"toIndex": 7, "toIndex": 7,
"type": "True" "type": "Success"
},
{
"fromIndex": 7,
"toIndex": 2,
"type": "False"
}, },
{ {
"fromIndex": 7, "fromIndex": 7,
@ -378,54 +358,39 @@
{ {
"fromIndex": 8, "fromIndex": 8,
"toIndex": 2, "toIndex": 2,
"type": "Success" "type": "True"
}, },
{ {
"fromIndex": 9, "fromIndex": 9,
"toIndex": 10, "toIndex": 6,
"type": "Success" "type": "Success"
}, },
{ {
"fromIndex": 10, "fromIndex": 10,
"toIndex": 11, "toIndex": 11,
"type": "True" "type": "Success"
}, },
{ {
"fromIndex": 11, "fromIndex": 11,
"toIndex": 6, "toIndex": 12,
"type": "True"
},
{
"fromIndex": 12,
"toIndex": 9,
"type": "Success" "type": "Success"
}, },
{ {
"fromIndex": 13, "fromIndex": 13,
"toIndex": 14,
"type": "Success"
},
{
"fromIndex": 14,
"toIndex": 15,
"type": "Success"
},
{
"fromIndex": 16,
"toIndex": 0, "toIndex": 0,
"type": "False" "type": "False"
}, },
{ {
"fromIndex": 16, "fromIndex": 13,
"toIndex": 17, "toIndex": 14,
"type": "True" "type": "True"
}, },
{ {
"fromIndex": 17, "fromIndex": 14,
"toIndex": 13, "toIndex": 10,
"type": "Success" "type": "Success"
} }
], ],
"ruleChainConnections": null "ruleChainConnections": null
} }
} }

View File

@ -57,6 +57,8 @@ monitoring:
queue: '${MQTT_TRANSPORT_USED_QUEUE:Main}' queue: '${MQTT_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name
name_prefix: '${MQTT_TRANSPORT_TARGET_NAME_PREFIX:}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc. # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc.
@ -72,6 +74,8 @@ monitoring:
queue: '${COAP_TRANSPORT_USED_QUEUE:Main}' queue: '${COAP_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name
name_prefix: '${COAP_TRANSPORT_TARGET_NAME_PREFIX:}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc. # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc.
@ -87,6 +91,8 @@ monitoring:
queue: '${HTTP_TRANSPORT_USED_QUEUE:Main}' queue: '${HTTP_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name
name_prefix: '${HTTP_TRANSPORT_TARGET_NAME_PREFIX:}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc. # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc.
@ -102,12 +108,17 @@ monitoring:
queue: '${LWM2M_TRANSPORT_USED_QUEUE:Main}' queue: '${LWM2M_TRANSPORT_USED_QUEUE:Main}'
# Whether to monitor IPs associated with the domain from base url # Whether to monitor IPs associated with the domain from base url
check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}'
# Prefix for the target device name
name_prefix: '${LWM2M_TRANSPORT_TARGET_NAME_PREFIX:}'
# To add more targets, use following environment variables: # To add more targets, use following environment variables:
# monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc.
edqs: edqs:
enabled: "${EDQS_MONITORING_ENABLED:false}" enabled: "${EDQS_MONITORING_ENABLED:false}"
calculated_fields:
enabled: "${CALCULATED_FIELDS_MONITORING_ENABLED:true}"
notifications: notifications:
message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}' message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}'
slack: slack:

View File

@ -55,9 +55,9 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.EntityViewInfo; import org.thingsboard.server.common.data.EntityViewInfo;
import org.thingsboard.server.common.data.EventInfo; import org.thingsboard.server.common.data.EventInfo;
import org.thingsboard.server.common.data.ResourceExportData;
import org.thingsboard.server.common.data.OtaPackage; import org.thingsboard.server.common.data.OtaPackage;
import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.OtaPackageInfo;
import org.thingsboard.server.common.data.ResourceExportData;
import org.thingsboard.server.common.data.ResourceSubType; import org.thingsboard.server.common.data.ResourceSubType;
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
@ -86,6 +86,7 @@ import org.thingsboard.server.common.data.asset.AssetProfileInfo;
import org.thingsboard.server.common.data.asset.AssetSearchQuery; import org.thingsboard.server.common.data.asset.AssetSearchQuery;
import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.audit.AuditLog; import org.thingsboard.server.common.data.audit.AuditLog;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.device.DeviceSearchQuery;
import org.thingsboard.server.common.data.domain.Domain; import org.thingsboard.server.common.data.domain.Domain;
import org.thingsboard.server.common.data.domain.DomainInfo; import org.thingsboard.server.common.data.domain.DomainInfo;
@ -3765,7 +3766,7 @@ public class RestClient implements Closeable {
} }
public PageData<TbResourceInfo> getImages(PageLink pageLink, boolean includeSystemImages) { public PageData<TbResourceInfo> getImages(PageLink pageLink, boolean includeSystemImages) {
return this.getImages(pageLink, null, includeSystemImages); return this.getImages(pageLink, null, includeSystemImages);
} }
public PageData<TbResourceInfo> getImages(PageLink pageLink, ResourceSubType imageSubType, boolean includeSystemImages) { public PageData<TbResourceInfo> getImages(PageLink pageLink, ResourceSubType imageSubType, boolean includeSystemImages) {
@ -4056,6 +4057,21 @@ public class RestClient implements Closeable {
timeout).getBody(); timeout).getBody();
} }
public CalculatedField saveCalculatedField(CalculatedField calculatedField) {
return restTemplate.postForEntity(baseURL + "/api/calculatedField", calculatedField, CalculatedField.class).getBody();
}
public PageData<CalculatedField> getCalculatedFieldsByEntityId(EntityId entityId, PageLink pageLink) {
Map<String, String> params = new HashMap<>();
addPageLinkToParam(params, pageLink);
return restTemplate.exchange(
baseURL + "/api/" + entityId.getEntityType() + "/" + entityId.getId() + "/calculatedFields?" + getUrlParams(pageLink),
HttpMethod.GET, HttpEntity.EMPTY,
new ParameterizedTypeReference<PageData<CalculatedField>>() {
}, params).getBody();
}
private String getTimeUrlParams(TimePageLink pageLink) { private String getTimeUrlParams(TimePageLink pageLink) {
String urlParams = getUrlParams(pageLink); String urlParams = getUrlParams(pageLink);
if (pageLink.getStartTime() != null) { if (pageLink.getStartTime() != null) {