diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java index f5f9e73275..c8e8d7070b 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java @@ -25,6 +25,7 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.EnableScheduling; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.monitoring.service.BaseMonitoringService; +import org.thingsboard.monitoring.service.MonitoringEntityService; import java.util.List; import java.util.Map; @@ -38,6 +39,8 @@ public class ThingsboardMonitoringApplication { @Autowired private List> monitoringServices; + @Autowired + private MonitoringEntityService entityService; @Value("${monitoring.monitoring_rate_ms}") private int monitoringRateMs; @@ -50,6 +53,9 @@ public class ThingsboardMonitoringApplication { @EventListener(ApplicationReadyEvent.class) public void startMonitoring() { + entityService.checkEntities(); + monitoringServices.forEach(BaseMonitoringService::init); + ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("monitoring-executor"); scheduler.scheduleWithFixedDelay(() -> { monitoringServices.forEach(monitoringService -> { diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java index e6a7e1b8af..0884cf7e5c 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/TbClient.java @@ -15,17 +15,15 @@ */ package org.thingsboard.monitoring.client; +import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.web.client.RestTemplateBuilder; -import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import org.thingsboard.rest.client.RestClient; import java.time.Duration; @Component -@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class TbClient extends RestClient { @Value("${monitoring.rest.username}") @@ -41,6 +39,11 @@ public class TbClient extends RestClient { .build(), baseUrl); } + @PostConstruct + private void init() { + logIn(); + } + public String logIn() { login(username, password); return getToken(); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java index 9246feac27..9bacbdfd45 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/client/WsClient.java @@ -36,8 +36,11 @@ import org.thingsboard.server.common.data.query.EntityListFilter; import javax.net.ssl.SSLParameters; import java.net.URI; import java.nio.channels.NotYetConnectedException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -48,13 +51,13 @@ import java.util.stream.Collectors; @Slf4j public class WsClient extends WebSocketClient implements AutoCloseable { - public volatile JsonNode lastMsg; + public final List lastMsgs = new ArrayList<>(); private CountDownLatch reply; private CountDownLatch update; private final Lock updateLock = new ReentrantLock(); - private long requestTimeoutMs; + private final long requestTimeoutMs; public WsClient(URI serverUri, long requestTimeoutMs) { super(serverUri); @@ -63,7 +66,6 @@ public class WsClient extends WebSocketClient implements AutoCloseable { @Override public void onOpen(ServerHandshake serverHandshake) { - } @Override @@ -73,8 +75,9 @@ public class WsClient extends WebSocketClient implements AutoCloseable { } updateLock.lock(); try { - lastMsg = JacksonUtil.toJsonNode(s); - log.trace("Received new msg: {}", lastMsg.toPrettyString()); + JsonNode msg = JacksonUtil.toJsonNode(s); + lastMsgs.add(msg); + log.trace("Received new msg: {}", msg.toPrettyString()); if (update != null) { update.countDown(); } @@ -96,11 +99,11 @@ public class WsClient extends WebSocketClient implements AutoCloseable { log.error("WebSocket client error:", e); } - public void registerWaitForUpdate() { + public void registerWaitForUpdates(int count) { updateLock.lock(); try { - lastMsg = null; - update = new CountDownLatch(1); + lastMsgs.clear(); + update = new CountDownLatch(count); } finally { updateLock.unlock(); } @@ -111,6 +114,7 @@ public class WsClient extends WebSocketClient implements AutoCloseable { public void send(String text) throws NotYetConnectedException { updateLock.lock(); try { + lastMsgs.clear(); reply = new CountDownLatch(1); } finally { updateLock.unlock(); @@ -118,19 +122,19 @@ public class WsClient extends WebSocketClient implements AutoCloseable { super.send(text); } - public WsClient subscribeForTelemetry(List devices, String key) { + public WsClient subscribeForTelemetry(List devices, List keys) { EntityDataCmd cmd = new EntityDataCmd(); cmd.setCmdId(RandomUtils.nextInt(0, 1000)); EntityListFilter devicesFilter = new EntityListFilter(); devicesFilter.setEntityType(EntityType.DEVICE); devicesFilter.setEntityList(devices.stream().map(UUID::toString).collect(Collectors.toList())); - EntityDataPageLink pageLink = new EntityDataPageLink(100,0, null, null); + EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, null); EntityDataQuery devicesQuery = new EntityDataQuery(devicesFilter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); cmd.setQuery(devicesQuery); LatestValueCmd latestCmd = new LatestValueCmd(); - latestCmd.setKeys(List.of(new EntityKey(EntityKeyType.TIME_SERIES, key))); + latestCmd.setKeys(keys.stream().map(key -> new EntityKey(EntityKeyType.TIME_SERIES, key)).toList()); cmd.setLatestCmd(latestCmd); CmdsWrapper wrapper = new CmdsWrapper(); @@ -139,12 +143,12 @@ public class WsClient extends WebSocketClient implements AutoCloseable { return this; } - public JsonNode waitForUpdate(long ms) { + public List waitForUpdates(long ms) { log.trace("update latch count: {}", update.getCount()); try { if (update.await(ms, TimeUnit.MILLISECONDS)) { log.trace("Waited for update"); - return getLastMsg(); + return getLastMsgs(); } } catch (InterruptedException e) { log.debug("Failed to await reply", e); @@ -157,7 +161,8 @@ public class WsClient extends WebSocketClient implements AutoCloseable { try { if (reply.await(requestTimeoutMs, TimeUnit.MILLISECONDS)) { log.trace("Waited for reply"); - return getLastMsg(); + List lastMsgs = getLastMsgs(); + return lastMsgs.isEmpty() ? null : lastMsgs.get(0); } } catch (InterruptedException e) { log.debug("Failed to await reply", e); @@ -166,24 +171,30 @@ public class WsClient extends WebSocketClient implements AutoCloseable { throw new IllegalStateException("No WS reply arrived within " + requestTimeoutMs + " ms"); } - private JsonNode getLastMsg() { - if (lastMsg != null) { - JsonNode errorMsg = lastMsg.get("errorMsg"); - if (errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText())) { - throw new RuntimeException("WS error from server: " + errorMsg.asText()); - } else { - return lastMsg; - } - } else { - return null; + private List getLastMsgs() { + if (lastMsgs.isEmpty()) { + return lastMsgs; } + List errors = lastMsgs.stream() + .map(msg -> msg.get("errorMsg")) + .filter(errorMsg -> errorMsg != null && !errorMsg.isNull() && StringUtils.isNotEmpty(errorMsg.asText())) + .toList(); + if (!errors.isEmpty()) { + throw new RuntimeException("WS error from server: " + errors.stream() + .map(JsonNode::asText) + .collect(Collectors.joining(", "))); + } + return lastMsgs; } - public Object getTelemetryUpdate(UUID deviceId, String key) { - JsonNode lastMsg = getLastMsg(); - if (lastMsg == null || lastMsg.isNull()) return null; - EntityDataUpdate update = JacksonUtil.treeToValue(lastMsg, EntityDataUpdate.class); - return update.getLatest(deviceId, key); + public Map getLatest(UUID deviceId) { + Map updates = new HashMap<>(); + getLastMsgs().forEach(msg -> { + EntityDataUpdate update = JacksonUtil.treeToValue(msg, EntityDataUpdate.class); + Map latest = update.getLatest(deviceId); + updates.putAll(latest); + }); + return updates; } @Override diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java index 208d28ffee..6b77e1e268 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportInfo.java @@ -20,16 +20,15 @@ import lombok.Data; @Data public class TransportInfo { - private final TransportType transportType; - private final String baseUrl; - private final String queue; + private final TransportType type; + private final TransportMonitoringTarget target; @Override public String toString() { - if (queue.equals("Main")) { - return String.format("*%s* (%s)", transportType.getName(), baseUrl); + if (target.getQueue().equals("Main")) { + return String.format("*%s* (%s)", type.getName(), target.getBaseUrl()); } else { - return String.format("*%s* (%s) _%s_", transportType.getName(), baseUrl, queue); + return String.format("*%s* (%s) _%s_", type.getName(), target.getBaseUrl(), target.getQueue()); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java index 1e6a3c8509..5558f39c05 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/config/transport/TransportMonitoringTarget.java @@ -15,6 +15,7 @@ */ package org.thingsboard.monitoring.config.transport; +import com.google.common.base.Strings; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.thingsboard.monitoring.config.MonitoringTarget; @@ -28,6 +29,7 @@ public class TransportMonitoringTarget implements MonitoringTarget { private DeviceConfig device; // set manually during initialization private String queue; private boolean checkDomainIps; + private String namePrefix; @Override public UUID getDeviceId() { @@ -38,4 +40,8 @@ public class TransportMonitoringTarget implements MonitoringTarget { return StringUtils.defaultIfEmpty(queue, "Main"); } + public String getNamePrefix() { + return Strings.nullToEmpty(namePrefix); + } + } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/ServiceFailureException.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/ServiceFailureException.java index 5f46514bd1..b2592b8719 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/ServiceFailureException.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/ServiceFailureException.java @@ -15,14 +15,21 @@ */ package org.thingsboard.monitoring.data; +import lombok.Getter; + +@Getter public class ServiceFailureException extends RuntimeException { - public ServiceFailureException(Throwable cause) { + private final Object serviceKey; + + public ServiceFailureException(Object serviceKey, Throwable cause) { super(cause.getMessage(), cause); + this.serviceKey = serviceKey; } - public ServiceFailureException(String message) { + public ServiceFailureException(Object serviceKey, String message) { super(message); + this.serviceKey = serviceKey; } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java index b82dcbf54d..0706dedcf3 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/cmd/EntityDataUpdate.java @@ -19,9 +19,11 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Data; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityKeyType; -import org.thingsboard.server.common.data.query.TsValue; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; @Data @@ -31,14 +33,16 @@ public class EntityDataUpdate { @JsonIgnoreProperties(ignoreUnknown = true) private List update; - public String getLatest(UUID entityId, String key) { - if (update == null) return null; - - return update.stream() + public Map getLatest(UUID entityId) { + if (update == null || update.isEmpty()) { + return Collections.emptyMap(); + } + Map result = new HashMap<>(); + update.stream() .filter(entityData -> entityData.getEntityId().getId().equals(entityId)).findFirst() .map(EntityData::getLatest).map(latest -> latest.get(EntityKeyType.TIME_SERIES)) - .map(latest -> latest.get(key)).map(TsValue::getValue) - .orElse(null); + .ifPresent(latest -> latest.forEach((key, tsValue) -> result.put(key, tsValue.getValue()))); + return result; } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java index 482b3a30fb..072310fb67 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/BaseHealthChecker.java @@ -22,7 +22,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.thingsboard.monitoring.client.TbClient; import org.thingsboard.monitoring.client.WsClient; import org.thingsboard.monitoring.config.MonitoringConfig; import org.thingsboard.monitoring.config.MonitoringTarget; @@ -46,6 +45,8 @@ public abstract class BaseHealthChecker> associates = new HashMap<>(); public static final String TEST_TELEMETRY_KEY = "testData"; + public static final String TEST_CF_TELEMETRY_KEY = "testDataCf"; @PostConstruct private void init() { info = getInfo(); } - protected abstract void initialize(TbClient tbClient); + protected abstract void initialize(); public final void check(WsClient wsClient) { log.debug("[{}] Checking", info); try { - wsClient.registerWaitForUpdate(); + int expectedUpdatesCount = isCfMonitoringEnabled() ? 2 : 1; + wsClient.registerWaitForUpdates(expectedUpdatesCount); String testValue = UUID.randomUUID().toString(); String testPayload = createTestPayload(testValue); @@ -79,16 +82,16 @@ public abstract class BaseHealthChecker latest = wsClient.getLatest(target.getDeviceId()); + if (latest.isEmpty()) { + throw new ServiceFailureException(info, "No WS update arrived within " + resultCheckTimeoutMs + " ms"); + } + String actualValue = latest.get(TEST_TELEMETRY_KEY); + if (!testValue.equals(actualValue)) { + throw new ServiceFailureException(info, "Was expecting value " + testValue + " but got " + actualValue); + } + if (isCfMonitoringEnabled()) { + String cfTestValue = testValue + "-cf"; + String actualCfValue = latest.get(TEST_CF_TELEMETRY_KEY); + if (actualCfValue == null) { + throw new ServiceFailureException(info, "No calculated field value arrived"); + } else if (!cfTestValue.equals(actualCfValue)) { + throw new ServiceFailureException(info, "Was expecting calculated field value " + cfTestValue + " but got " + actualCfValue); + } } reporter.reportLatency(Latencies.wsUpdate(getKey()), stopWatch.getTime()); } @@ -121,6 +135,9 @@ public abstract class BaseHealthChecker, T extends MonitoringTarget> { @@ -79,14 +80,15 @@ public abstract class BaseMonitoringService, T ext protected ApplicationContext applicationContext; @Value("${monitoring.edqs.enabled:false}") - private boolean edqsMonitoringEnabled; + private boolean checkEdqs; + @Value("${monitoring.calculated_fields.enabled:true}") + protected boolean checkCalculatedFields; - @PostConstruct - private void init() { + public void init() { if (configs == null || configs.isEmpty()) { return; } - tbClient.logIn(); + configs.forEach(config -> { config.getTargets().forEach(target -> { BaseHealthChecker healthChecker = initHealthChecker(target, config); @@ -104,7 +106,7 @@ public abstract class BaseMonitoringService, T ext private BaseHealthChecker initHealthChecker(T target, C config) { BaseHealthChecker healthChecker = (BaseHealthChecker) createHealthChecker(config, target); log.info("Initializing {} for {}", healthChecker.getClass().getSimpleName(), target.getBaseUrl()); - healthChecker.initialize(tbClient); + healthChecker.initialize(); devices.add(target.getDeviceId()); return healthChecker; } @@ -121,7 +123,7 @@ public abstract class BaseMonitoringService, T ext try (WsClient wsClient = wsClientFactory.createClient(accessToken)) { stopWatch.start(); - wsClient.subscribeForTelemetry(devices, TransportHealthChecker.TEST_TELEMETRY_KEY).waitForReply(); + wsClient.subscribeForTelemetry(devices, getTestTelemetryKeys()).waitForReply(); reporter.reportLatency(Latencies.WS_SUBSCRIBE, stopWatch.getTime()); for (BaseHealthChecker healthChecker : healthCheckers) { @@ -129,22 +131,17 @@ public abstract class BaseMonitoringService, T ext } } - if (edqsMonitoringEnabled) { - try { - stopWatch.start(); - checkEdqs(); - reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime()); - - reporter.serviceIsOk(MonitoredServiceKey.EDQS); - } catch (ServiceFailureException e) { - reporter.serviceFailure(MonitoredServiceKey.EDQS, e); - } catch (Exception e) { - reporter.serviceFailure(MonitoredServiceKey.GENERAL, e); - } + if (checkEdqs) { + stopWatch.start(); + checkEdqs(); + reporter.reportLatency(Latencies.EDQS_QUERY, stopWatch.getTime()); + reporter.serviceIsOk(MonitoredServiceKey.EDQS); } - reporter.reportLatencies(tbClient); + reporter.reportLatencies(); log.debug("Finished {}", getName()); + } catch (ServiceFailureException e) { + reporter.serviceFailure(e.getServiceKey(), e); } catch (Throwable error) { try { reporter.serviceFailure(MonitoredServiceKey.GENERAL, error); @@ -199,7 +196,7 @@ public abstract class BaseMonitoringService, T ext .collect(Collectors.toSet()); Set missing = Sets.difference(new HashSet<>(this.devices), devices); if (!missing.isEmpty()) { - throw new ServiceFailureException("Missing devices in the response: " + missing); + throw new ServiceFailureException(MonitoredServiceKey.EDQS, "Missing devices in the response: " + missing); } result.getData().stream() @@ -211,7 +208,7 @@ public abstract class BaseMonitoringService, T ext Stream.of("name", "type", "testData").forEach(key -> { TsValue value = values.get(key); if (value == null || StringUtils.isBlank(value.getValue())) { - throw new ServiceFailureException("Missing " + key + " for device " + entityData.getEntityId()); + throw new ServiceFailureException(MonitoredServiceKey.EDQS, "Missing " + key + " for device " + entityData.getEntityId()); } }); }); @@ -232,6 +229,10 @@ public abstract class BaseMonitoringService, T ext .collect(Collectors.toSet()); } + private List getTestTelemetryKeys() { + return checkCalculatedFields ? List.of(TEST_TELEMETRY_KEY, TEST_CF_TELEMETRY_KEY) : List.of(TEST_TELEMETRY_KEY); + } + private void stopHealthChecker(BaseHealthChecker healthChecker) throws Exception { healthChecker.destroyClient(); devices.remove(healthChecker.getTarget().getDeviceId()); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringEntityService.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringEntityService.java new file mode 100644 index 0000000000..062104ecd5 --- /dev/null +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringEntityService.java @@ -0,0 +1,252 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.monitoring.service; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.RegexUtils; +import org.thingsboard.monitoring.client.TbClient; +import org.thingsboard.monitoring.config.transport.DeviceConfig; +import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig; +import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; +import org.thingsboard.monitoring.config.transport.TransportType; +import org.thingsboard.monitoring.util.ResourceUtils; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.DeviceProfileType; +import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; +import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.OutputType; +import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; +import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration; +import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials; +import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials; +import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecBootstrapClientCredential; +import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredential; +import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; +import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; +import org.thingsboard.server.common.data.device.data.DeviceData; +import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; +import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.DeviceProfileData; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleChainType; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.thingsboard.monitoring.service.BaseHealthChecker.TEST_CF_TELEMETRY_KEY; +import static org.thingsboard.monitoring.service.BaseHealthChecker.TEST_TELEMETRY_KEY; + +@Service +@Slf4j +@RequiredArgsConstructor +public class MonitoringEntityService { + + private final TbClient tbClient; + + @Value("${monitoring.calculated_fields.enabled:true}") + private boolean calculatedFieldsMonitoringEnabled; + + public void checkEntities() { + RuleChain ruleChain = tbClient.getRuleChains(RuleChainType.CORE, new PageLink(10)).getData().stream() + .filter(RuleChain::isRoot) + .findFirst().orElseThrow(); + RuleChainId ruleChainId = ruleChain.getId(); + + JsonNode ruleChainDescriptor = ResourceUtils.getResource("rule_chain.json"); + List attributeKeys = tbClient.getAttributeKeys(ruleChainId); + Map attributes = tbClient.getAttributeKvEntries(ruleChainId, attributeKeys).stream() + .collect(Collectors.toMap(KvEntry::getKey, KvEntry::getValueAsString)); + + int currentVersion = Integer.parseInt(attributes.getOrDefault("version", "0")); + int newVersion = ruleChainDescriptor.get("version").asInt(); + if (currentVersion == newVersion) { + log.info("Not updating rule chain, version is the same ({})", currentVersion); + return; + } else { + log.info("Updating rule chain '{}' from version {} to {}", ruleChain.getName(), currentVersion, newVersion); + } + + String metadataJson = RegexUtils.replace(ruleChainDescriptor.get("metadata").toString(), + "\\$\\{MONITORING:(.+?)}", matchResult -> { + String key = matchResult.group(1); + String value = attributes.get(key); + if (value == null) { + throw new IllegalArgumentException("No attribute found for key " + key); + } + log.info("Using {}: {}", key, value); + return value; + }); + RuleChainMetaData metaData = JacksonUtil.fromString(metadataJson, RuleChainMetaData.class); + metaData.setRuleChainId(ruleChainId); + tbClient.saveRuleChainMetaData(metaData); + tbClient.saveEntityAttributesV2(ruleChainId, DataConstants.SERVER_SCOPE, JacksonUtil.newObjectNode() + .put("version", newVersion)); + } + + public Asset getOrCreateMonitoringAsset() { + String assetName = "[Monitoring] Latencies"; + return tbClient.findAsset(assetName).orElseGet(() -> { + Asset asset = new Asset(); + asset.setType("Monitoring"); + asset.setName(assetName); + asset = tbClient.saveAsset(asset); + log.info("Created monitoring asset {}", asset.getId()); + return asset; + }); + } + + public void checkEntities(TransportMonitoringConfig config, TransportMonitoringTarget target) { + Device device = getOrCreateDevice(config, target); + DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId()) + .orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId())); + + DeviceConfig deviceConfig = new DeviceConfig(); + deviceConfig.setId(device.getId().toString()); + deviceConfig.setName(device.getName()); + deviceConfig.setCredentials(credentials); + target.setDevice(deviceConfig); + } + + private Device getOrCreateDevice(TransportMonitoringConfig config, TransportMonitoringTarget target) { + TransportType transportType = config.getTransportType(); + String deviceName = String.format("%s %s (%s) - %s", target.getNamePrefix(), transportType.getName(), target.getQueue(), target.getBaseUrl()).trim(); + Device device = tbClient.getTenantDevice(deviceName).orElse(null); + if (device != null) { + if (calculatedFieldsMonitoringEnabled) { + CalculatedField calculatedField = tbClient.getCalculatedFieldsByEntityId(device.getId(), new PageLink(1, 0, TEST_CF_TELEMETRY_KEY)) + .getData().stream().findFirst().orElse(null); + if (calculatedField == null) { + createCalculatedField(device); + } + } + return device; + } + + log.info("Creating new device '{}'", deviceName); + device = new Device(); + device.setName(deviceName); + + DeviceCredentials credentials = new DeviceCredentials(); + credentials.setCredentialsId(RandomStringUtils.randomAlphabetic(20)); + DeviceData deviceData = new DeviceData(); + deviceData.setConfiguration(new DefaultDeviceConfiguration()); + + DeviceProfile deviceProfile = getOrCreateDeviceProfile(config, target); + device.setType(deviceProfile.getName()); + device.setDeviceProfileId(deviceProfile.getId()); + + if (transportType != TransportType.LWM2M) { + deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); + credentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + } else { + deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration()); + credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS); + LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials(); + NoSecClientCredential client = new NoSecClientCredential(); + client.setEndpoint(credentials.getCredentialsId()); + lwm2mCreds.setClient(client); + LwM2MBootstrapClientCredentials bootstrap = new LwM2MBootstrapClientCredentials(); + bootstrap.setBootstrapServer(new NoSecBootstrapClientCredential()); + bootstrap.setLwm2mServer(new NoSecBootstrapClientCredential()); + lwm2mCreds.setBootstrap(bootstrap); + credentials.setCredentialsValue(JacksonUtil.toString(lwm2mCreds)); + } + + device = tbClient.saveDeviceWithCredentials(device, credentials).get(); + if (calculatedFieldsMonitoringEnabled) { + createCalculatedField(device); + } + return device; + } + + private DeviceProfile getOrCreateDeviceProfile(TransportMonitoringConfig config, TransportMonitoringTarget target) { + TransportType transportType = config.getTransportType(); + String profileName = String.format("%s %s (%s)", target.getNamePrefix(), transportType.getName(), target.getQueue()).trim(); + DeviceProfile deviceProfile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData() + .stream().findFirst().orElse(null); + if (deviceProfile != null) { + return deviceProfile; + } + + log.info("Creating new device profile '{}'", profileName); + if (transportType != TransportType.LWM2M) { + deviceProfile = new DeviceProfile(); + deviceProfile.setType(DeviceProfileType.DEFAULT); + deviceProfile.setTransportType(DeviceTransportType.DEFAULT); + DeviceProfileData profileData = new DeviceProfileData(); + profileData.setConfiguration(new DefaultDeviceProfileConfiguration()); + profileData.setTransportConfiguration(new DefaultDeviceProfileTransportConfiguration()); + deviceProfile.setProfileData(profileData); + } else { + tbClient.getResources(new PageLink(1, 0, "LwM2M Monitoring")).getData() + .stream().findFirst() + .orElseGet(() -> { + TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class); + log.info("Creating LwM2M resource"); + return tbClient.saveResource(newResource); + }); + deviceProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class); + } + + deviceProfile.setName(profileName); + deviceProfile.setDefaultQueueName(target.getQueue()); + return tbClient.saveDeviceProfile(deviceProfile); + } + + private void createCalculatedField(Device device) { + log.info("Creating calculated field for device '{}'", device.getName()); + CalculatedField calculatedField = new CalculatedField(); + calculatedField.setName(TEST_CF_TELEMETRY_KEY); + calculatedField.setEntityId(device.getId()); + calculatedField.setType(CalculatedFieldType.SCRIPT); + ScriptCalculatedFieldConfiguration configuration = new ScriptCalculatedFieldConfiguration(); + Argument testDataArgument = new Argument(); + testDataArgument.setRefEntityKey(new ReferencedEntityKey(TEST_TELEMETRY_KEY, ArgumentType.TS_LATEST, null)); + configuration.setArguments(Map.of( + TEST_TELEMETRY_KEY, testDataArgument + )); + configuration.setExpression("return { \"" + TEST_CF_TELEMETRY_KEY + "\": " + TEST_TELEMETRY_KEY + " + \"-cf\" };"); + Output output = new Output(); + output.setType(OutputType.TIME_SERIES); + configuration.setOutput(output); + calculatedField.setConfiguration(configuration); + calculatedField.setDebugMode(true); + tbClient.saveCalculatedField(calculatedField); + } + +} diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java index 3554731e58..99cb9feb3d 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/MonitoringReporter.java @@ -46,6 +46,8 @@ import java.util.stream.Collectors; public class MonitoringReporter { private final NotificationService notificationService; + private final TbClient tbClient; + private final MonitoringEntityService entityService; private final Map latencies = new ConcurrentHashMap<>(); private final Map failuresCounters = new ConcurrentHashMap<>(); @@ -62,7 +64,7 @@ public class MonitoringReporter { @Value("${monitoring.latency.reporting_asset_id}") private String reportingAssetId; - public void reportLatencies(TbClient tbClient) { + public void reportLatencies() { if (latencies.isEmpty()) { return; } @@ -81,15 +83,7 @@ public class MonitoringReporter { try { if (StringUtils.isBlank(reportingAssetId)) { - String assetName = "[Monitoring] Latencies"; - Asset monitoringAsset = tbClient.findAsset(assetName).orElseGet(() -> { - Asset asset = new Asset(); - asset.setType("Monitoring"); - asset.setName(assetName); - asset = tbClient.saveAsset(asset); - log.info("Created monitoring asset {}", asset.getId()); - return asset; - }); + Asset monitoringAsset = entityService.getOrCreateMonitoringAsset(); reportingAssetId = monitoringAsset.getId().toString(); } @@ -113,7 +107,7 @@ public class MonitoringReporter { public void serviceFailure(Object serviceKey, Throwable error) { if (log.isDebugEnabled()) { - log.error("Error occurred", error); + log.error("[{}] Error occurred", serviceKey, error); } int failuresCount = failuresCounters.computeIfAbsent(serviceKey, k -> new AtomicInteger()).incrementAndGet(); ServiceFailureNotification notification = new ServiceFailureNotification(serviceKey, error, failuresCount); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java index f5e6cb5469..946c3e6d3d 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java @@ -17,54 +17,27 @@ package org.thingsboard.monitoring.service.transport; import com.fasterxml.jackson.databind.node.TextNode; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.RandomStringUtils; +import org.springframework.beans.factory.annotation.Value; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.monitoring.client.TbClient; -import org.thingsboard.monitoring.config.transport.DeviceConfig; import org.thingsboard.monitoring.config.transport.TransportInfo; import org.thingsboard.monitoring.config.transport.TransportMonitoringConfig; import org.thingsboard.monitoring.config.transport.TransportMonitoringTarget; import org.thingsboard.monitoring.config.transport.TransportType; import org.thingsboard.monitoring.service.BaseHealthChecker; -import org.thingsboard.monitoring.util.ResourceUtils; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.DeviceProfileType; -import org.thingsboard.server.common.data.DeviceTransportType; -import org.thingsboard.server.common.data.TbResource; -import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials; -import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials; -import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecBootstrapClientCredential; -import org.thingsboard.server.common.data.device.credentials.lwm2m.NoSecClientCredential; -import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration; -import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; -import org.thingsboard.server.common.data.device.data.DeviceData; -import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; -import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; -import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration; -import org.thingsboard.server.common.data.device.profile.DeviceProfileData; -import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.common.data.security.DeviceCredentialsType; @Slf4j public abstract class TransportHealthChecker extends BaseHealthChecker { + @Value("${monitoring.calculated_fields.enabled:true}") + private boolean calculatedFieldsMonitoringEnabled; + public TransportHealthChecker(C config, TransportMonitoringTarget target) { super(config, target); } @Override - protected void initialize(TbClient tbClient) { - Device device = getOrCreateDevice(tbClient); - DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId()) - .orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId())); - - DeviceConfig deviceConfig = new DeviceConfig(); - deviceConfig.setId(device.getId().toString()); - deviceConfig.setName(device.getName()); - deviceConfig.setCredentials(credentials); - target.setDevice(deviceConfig); + protected void initialize() { + entityService.checkEntities(config, target); } @Override @@ -74,7 +47,7 @@ public abstract class TransportHealthChecker { - TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class); - log.info("Creating LwM2M resource"); - return tbClient.saveResource(newResource); - }); - deviceProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class); - } - - deviceProfile.setName(profileName); - deviceProfile.setDefaultQueueName(target.getQueue()); - return tbClient.saveDeviceProfile(deviceProfile); + @Override + protected boolean isCfMonitoringEnabled() { + return calculatedFieldsMonitoringEnabled; } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/util/ResourceUtils.java b/monitoring/src/main/java/org/thingsboard/monitoring/util/ResourceUtils.java index 3e25cfbd39..a92033f439 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/util/ResourceUtils.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/util/ResourceUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.monitoring.util; +import com.fasterxml.jackson.databind.JsonNode; import lombok.SneakyThrows; import org.thingsboard.common.util.JacksonUtil; @@ -24,14 +25,21 @@ public class ResourceUtils { @SneakyThrows public static T getResource(String path, Class type) { - InputStream resource = ResourceUtils.class.getClassLoader().getResourceAsStream(path); - if (resource == null) { - throw new IllegalArgumentException("Resource not found for path " + path); - } + InputStream resource = getResourceStream(path); return JacksonUtil.OBJECT_MAPPER.readValue(resource, type); } + @SneakyThrows + public static JsonNode getResource(String path) { + InputStream resource = getResourceStream(path); + return JacksonUtil.OBJECT_MAPPER.readTree(resource); + } + public static InputStream getResourceAsStream(String path) { + return getResourceStream(path); + } + + private static InputStream getResourceStream(String path) { InputStream resource = ResourceUtils.class.getClassLoader().getResourceAsStream(path); if (resource == null) { throw new IllegalArgumentException("Resource not found for path " + path); diff --git a/monitoring/src/main/resources/root_rule_chain.json b/monitoring/src/main/resources/rule_chain.json similarity index 76% rename from monitoring/src/main/resources/root_rule_chain.json rename to monitoring/src/main/resources/rule_chain.json index a1c12c8e9d..2bc0e89a8a 100644 --- a/monitoring/src/main/resources/root_rule_chain.json +++ b/monitoring/src/main/resources/rule_chain.json @@ -1,257 +1,232 @@ { + "version": 1, "ruleChain": { - "additionalInfo": null, "name": "Root Rule Chain", "type": "CORE", "firstRuleNodeId": null, "root": false, "debugMode": false, "configuration": null, - "externalId": null + "additionalInfo": null }, "metadata": { - "firstNodeIndex": 12, + "firstNodeIndex": 9, "nodes": [ { - "additionalInfo": { - "description": null, - "layoutX": 1202, - "layoutY": 221 - }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries", + "debugSettings": { + "failuresEnabled": true, + "allEnabled": false, + "allEnabledUntil": 1735310701003 + }, "singletonMode": false, + "queueName": null, "configurationVersion": 1, "configuration": { "defaultTTL": 0, - "useServerTs": false, "processingSettings": { "type": "ON_EVERY_MESSAGE" } }, - "externalId": null + "additionalInfo": { + "description": null, + "layoutX": 1202, + "layoutY": 221 + } }, { - "additionalInfo": { - "layoutX": 1000, - "layoutY": 167 - }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", "name": "Save Attributes", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 3, "configuration": { + "scope": "CLIENT_SCOPE", + "notifyDevice": false, "processingSettings": { "type": "ON_EVERY_MESSAGE" }, - "scope": "CLIENT_SCOPE", - "notifyDevice": false, "sendAttributesUpdatedNotification": false, "updateAttributesOnlyOnValueChange": false }, - "externalId": null + "additionalInfo": { + "layoutX": 1000, + "layoutY": 167 + } }, { - "additionalInfo": { - "layoutX": 566, - "layoutY": 302 - }, "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", "name": "Message Type Switch", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "version": 0 }, - "externalId": null + "additionalInfo": { + "layoutX": 566, + "layoutY": 302 + } }, { + "type": "org.thingsboard.rule.engine.action.TbLogNode", + "name": "Log RPC from Device", + "debugSettings": null, + "singletonMode": false, + "queueName": null, + "configurationVersion": 0, + "configuration": { + "scriptLang": "TBEL", + "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", + "tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" + }, "additionalInfo": { "layoutX": 1000, "layoutY": 381 - }, + } + }, + { "type": "org.thingsboard.rule.engine.action.TbLogNode", - "name": "Log RPC from Device", + "name": "Log Other", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "scriptLang": "TBEL", "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", "tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" }, - "externalId": null - }, - { "additionalInfo": { "layoutX": 1000, "layoutY": 494 - }, - "type": "org.thingsboard.rule.engine.action.TbLogNode", - "name": "Log Other", - "singletonMode": false, - "configurationVersion": 0, - "configuration": { - "scriptLang": "TBEL", - "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);", - "tbelScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" - }, - "externalId": null + } }, { - "additionalInfo": { - "layoutX": 1000, - "layoutY": 583 - }, "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", "name": "RPC Call Request", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "timeoutInSeconds": 60 }, - "externalId": null + "additionalInfo": { + "layoutX": 1000, + "layoutY": 583 + } }, { - "additionalInfo": { - "layoutX": 255, - "layoutY": 301 - }, - "type": "org.thingsboard.rule.engine.filter.TbOriginatorTypeFilterNode", - "name": "Is Entity Group", - "singletonMode": false, - "configurationVersion": 0, - "configuration": { - "originatorTypes": [ - "ENTITY_GROUP" - ] - }, - "externalId": null - }, - { - "additionalInfo": { - "layoutX": 319, - "layoutY": 109 - }, - "type": "org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode", - "name": "Post attributes or RPC request", - "singletonMode": false, - "configurationVersion": 0, - "configuration": { - "messageTypes": [ - "POST_ATTRIBUTES_REQUEST", - "RPC_CALL_FROM_SERVER_TO_DEVICE" - ] - }, - "externalId": null - }, - { - "additionalInfo": { - "layoutX": 627, - "layoutY": 108 - }, - "type": "org.thingsboard.rule.engine.transform.TbDuplicateMsgToGroupNode", - "name": "Duplicate To Group Entities", - "singletonMode": false, - "configurationVersion": 0, - "configuration": { - "entityGroupId": null, - "entityGroupIsMessageOriginator": true - }, - "externalId": null - }, - { - "additionalInfo": { - "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", - "layoutX": 45, - "layoutY": 359 - }, "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", "name": "Device Profile Node", + "debugSettings": { + "failuresEnabled": true, + "allEnabled": false, + "allEnabledUntil": 1735310701003 + }, "singletonMode": false, - "configurationVersion": 0, + "queueName": null, + "configurationVersion": 1, "configuration": { "persistAlarmRulesState": false, "fetchAlarmRulesStateOnStart": false }, - "externalId": null + "additionalInfo": { + "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", + "layoutX": 45, + "layoutY": 359 + } }, { - "additionalInfo": { - "description": "", - "layoutX": 160, - "layoutY": 631 - }, "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", "name": "Test JS script", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "scriptLang": "JS", - "jsScript": "var test = {\n a: 'a',\n b: 'b'\n};\nreturn test.a === 'a' && test.b === 'b';", + "jsScript": "var test = {\n a: 'a',\n b: 'b'\n};\n\nreturn test.a === 'a' && test.b === 'b';", "tbelScript": "return msg.temperature > 20;" }, - "externalId": null - }, - { "additionalInfo": { "description": "", - "layoutX": 427, - "layoutY": 541 - }, + "layoutX": 251, + "layoutY": 499 + } + }, + { "type": "org.thingsboard.rule.engine.filter.TbJsFilterNode", "name": "Test TBEL script", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "scriptLang": "TBEL", "jsScript": "return msg.temperature > 20;", "tbelScript": "var a = \"a\";\nvar b = \"b\";\nreturn a.equals(\"a\") && b.equals(\"b\");" }, - "externalId": null - }, - { "additionalInfo": { "description": "", - "layoutX": 40, - "layoutY": 252 - }, + "layoutX": 317, + "layoutY": 355 + } + }, + { "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", "name": "Add arrival timestamp", + "debugSettings": { + "failuresEnabled": true, + "allEnabled": false, + "allEnabledUntil": 1744642101587 + }, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "scriptLang": "TBEL", "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", "tbelScript": "metadata.arrivalTs = Date.now();\nreturn {msg: msg, metadata: metadata, msgType: msgType};" }, - "externalId": null - }, - { "additionalInfo": { "description": "", - "layoutX": 1467, - "layoutY": 267 - }, + "layoutX": 40, + "layoutY": 252 + } + }, + { "type": "org.thingsboard.rule.engine.transform.TbTransformMsgNode", "name": "Calculate additional latencies", + "debugSettings": { + "failuresEnabled": true, + "allEnabled": false, + "allEnabledUntil": 1735310701003 + }, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "scriptLang": "TBEL", "jsScript": "return {msg: msg, metadata: metadata, msgType: msgType};", "tbelScript": "var arrivalLatency = metadata.arrivalTs - metadata.ts;\nvar processingTime = Date.now() - metadata.arrivalTs;\nmsg = {\n arrivalLatency: arrivalLatency,\n processingTime: processingTime\n};\nreturn {msg: msg, metadata: metadata, msgType: msgType};" }, - "externalId": null - }, - { "additionalInfo": { "description": "", - "layoutX": 1438, - "layoutY": 403 - }, + "layoutX": 1467, + "layoutY": 267 + } + }, + { "type": "org.thingsboard.rule.engine.transform.TbChangeOriginatorNode", "name": "To latencies asset", + "debugSettings": null, "singletonMode": false, + "queueName": null, "configurationVersion": 0, "configuration": { "originatorSource": "ENTITY", @@ -269,64 +244,79 @@ "fetchLastLevelOnly": false } }, - "externalId": null + "additionalInfo": { + "description": "", + "layoutX": 1438, + "layoutY": 403 + } }, { + "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", + "name": "Save Timeseries", + "debugSettings": { + "failuresEnabled": true, + "allEnabled": false, + "allEnabledUntil": 1735310701003 + }, + "singletonMode": false, + "queueName": null, + "configurationVersion": 1, + "configuration": { + "defaultTTL": 0, + "processingSettings": { + "type": "ON_EVERY_MESSAGE" + } + }, "additionalInfo": { "description": null, "layoutX": 1458, "layoutY": 505 - }, - "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", - "name": "Save Timeseries", - "singletonMode": false, - "configurationVersion": 1, - "configuration": { - "defaultTTL": 0, - "useServerTs": false, - "processingSettings": { - "type": "ON_EVERY_MESSAGE" - } - }, - "externalId": null + } }, { + "type": "org.thingsboard.rule.engine.filter.TbCheckMessageNode", + "name": "Has testData", + "debugSettings": null, + "singletonMode": false, + "queueName": null, + "configurationVersion": 0, + "configuration": { + "messageNames": [ + "testData", + "testDataCf" + ], + "metadataNames": [], + "checkAllKeys": false + }, "additionalInfo": { "description": "", "layoutX": 928, "layoutY": 266 - }, - "type": "org.thingsboard.rule.engine.filter.TbCheckMessageNode", - "name": "Has testData", - "singletonMode": false, - "configurationVersion": 0, - "configuration": { - "messageNames": [ - "testData" - ], - "metadataNames": [], - "checkAllKeys": true - }, - "externalId": null + } }, { - "additionalInfo": { - "description": null, - "layoutX": 1203, - "layoutY": 327 - }, "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", "name": "Save Timeseries with TTL", + "debugSettings": { + "failuresEnabled": true, + "allEnabled": false, + "allEnabledUntil": 1742305233839 + }, "singletonMode": false, + "queueName": null, "configurationVersion": 1, "configuration": { - "defaultTTL": 180, - "useServerTs": false, "processingSettings": { "type": "ON_EVERY_MESSAGE" - } + }, + "defaultTTL": 60, + "useServerTs": null }, - "externalId": null + "additionalInfo": { + "description": "", + "layoutX": 1203, + "layoutY": 327 + } } ], "connections": [ @@ -352,23 +342,13 @@ }, { "fromIndex": 2, - "toIndex": 16, + "toIndex": 13, "type": "Post telemetry" }, - { - "fromIndex": 6, - "toIndex": 2, - "type": "False" - }, { "fromIndex": 6, "toIndex": 7, - "type": "True" - }, - { - "fromIndex": 7, - "toIndex": 2, - "type": "False" + "type": "Success" }, { "fromIndex": 7, @@ -378,54 +358,39 @@ { "fromIndex": 8, "toIndex": 2, - "type": "Success" + "type": "True" }, { "fromIndex": 9, - "toIndex": 10, + "toIndex": 6, "type": "Success" }, { "fromIndex": 10, "toIndex": 11, - "type": "True" + "type": "Success" }, { "fromIndex": 11, - "toIndex": 6, - "type": "True" - }, - { - "fromIndex": 12, - "toIndex": 9, + "toIndex": 12, "type": "Success" }, { "fromIndex": 13, - "toIndex": 14, - "type": "Success" - }, - { - "fromIndex": 14, - "toIndex": 15, - "type": "Success" - }, - { - "fromIndex": 16, "toIndex": 0, "type": "False" }, { - "fromIndex": 16, - "toIndex": 17, + "fromIndex": 13, + "toIndex": 14, "type": "True" }, { - "fromIndex": 17, - "toIndex": 13, + "fromIndex": 14, + "toIndex": 10, "type": "Success" } ], "ruleChainConnections": null } -} +} \ No newline at end of file diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index 6cc2e79cb4..0955862f46 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -57,6 +57,8 @@ monitoring: queue: '${MQTT_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' + # Prefix for the target device name + name_prefix: '${MQTT_TRANSPORT_TARGET_NAME_PREFIX:}' # To add more targets, use following environment variables: # monitoring.transports.mqtt.targets[1].base_url, monitoring.transports.mqtt.targets[2].base_url, etc. @@ -72,6 +74,8 @@ monitoring: queue: '${COAP_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' + # Prefix for the target device name + name_prefix: '${COAP_TRANSPORT_TARGET_NAME_PREFIX:}' # To add more targets, use following environment variables: # monitoring.transports.coap.targets[1].base_url, monitoring.transports.coap.targets[2].base_url, etc. @@ -87,6 +91,8 @@ monitoring: queue: '${HTTP_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' + # Prefix for the target device name + name_prefix: '${HTTP_TRANSPORT_TARGET_NAME_PREFIX:}' # To add more targets, use following environment variables: # monitoring.transports.http.targets[1].base_url, monitoring.transports.http.targets[2].base_url, etc. @@ -102,12 +108,17 @@ monitoring: queue: '${LWM2M_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' + # Prefix for the target device name + name_prefix: '${LWM2M_TRANSPORT_TARGET_NAME_PREFIX:}' # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. edqs: enabled: "${EDQS_MONITORING_ENABLED:false}" + calculated_fields: + enabled: "${CALCULATED_FIELDS_MONITORING_ENABLED:true}" + notifications: message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}' slack: diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 4f7538d205..633dbe9c57 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -55,9 +55,9 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityViewInfo; import org.thingsboard.server.common.data.EventInfo; -import org.thingsboard.server.common.data.ResourceExportData; import org.thingsboard.server.common.data.OtaPackage; import org.thingsboard.server.common.data.OtaPackageInfo; +import org.thingsboard.server.common.data.ResourceExportData; import org.thingsboard.server.common.data.ResourceSubType; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.StringUtils; @@ -86,6 +86,7 @@ import org.thingsboard.server.common.data.asset.AssetProfileInfo; import org.thingsboard.server.common.data.asset.AssetSearchQuery; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.domain.Domain; import org.thingsboard.server.common.data.domain.DomainInfo; @@ -3765,7 +3766,7 @@ public class RestClient implements Closeable { } public PageData getImages(PageLink pageLink, boolean includeSystemImages) { - return this.getImages(pageLink, null, includeSystemImages); + return this.getImages(pageLink, null, includeSystemImages); } public PageData getImages(PageLink pageLink, ResourceSubType imageSubType, boolean includeSystemImages) { @@ -4056,6 +4057,21 @@ public class RestClient implements Closeable { timeout).getBody(); } + public CalculatedField saveCalculatedField(CalculatedField calculatedField) { + return restTemplate.postForEntity(baseURL + "/api/calculatedField", calculatedField, CalculatedField.class).getBody(); + } + + public PageData getCalculatedFieldsByEntityId(EntityId entityId, PageLink pageLink) { + Map params = new HashMap<>(); + addPageLinkToParam(params, pageLink); + return restTemplate.exchange( + baseURL + "/api/" + entityId.getEntityType() + "/" + entityId.getId() + "/calculatedFields?" + getUrlParams(pageLink), + HttpMethod.GET, HttpEntity.EMPTY, + new ParameterizedTypeReference>() { + }, params).getBody(); + + } + private String getTimeUrlParams(TimePageLink pageLink) { String urlParams = getUrlParams(pageLink); if (pageLink.getStartTime() != null) {