diff --git a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json index a299fa4c38..a37b1fc865 100644 --- a/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json +++ b/application/src/main/data/json/tenant/rule_chains/root_rule_chain.json @@ -8,7 +8,7 @@ "configuration": null }, "metadata": { - "firstNodeIndex": 2, + "firstNodeIndex": 6, "nodes": [ { "additionalInfo": { @@ -82,9 +82,28 @@ "configuration": { "timeoutInSeconds": 60 } + }, + { + "additionalInfo": { + "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", + "layoutX": 204, + "layoutY": 240 + }, + "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", + "name": "Device Profile Node", + "debugMode": false, + "configuration": { + "persistAlarmRulesState": false, + "fetchAlarmRulesStateOnStart": false + } } ], "connections": [ + { + "fromIndex": 6, + "toIndex": 2, + "type": "Success" + }, { "fromIndex": 2, "toIndex": 4, diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 0e524e9e5c..7bcf38080e 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -181,6 +181,7 @@ public class ThingsboardInstallService { databaseTsUpgradeService.upgradeDatabase("3.1.1"); } databaseEntitiesUpgradeService.upgradeDatabase("3.1.1"); + dataUpdateService.updateData("3.1.1"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index f0fbfb3448..bc86857c4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.service.install.update; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -23,9 +25,13 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import org.thingsboard.rule.engine.profile.TbDeviceProfileNode; +import org.thingsboard.rule.engine.profile.TbDeviceProfileNodeConfiguration; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.SearchTextBased; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UUIDBased; @@ -35,10 +41,13 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.util.mapping.JacksonUtil; import org.thingsboard.server.service.install.InstallScripts; import javax.annotation.Nullable; @@ -49,6 +58,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.apache.commons.lang.StringUtils.isBlank; +import static org.thingsboard.server.service.install.DatabaseHelper.objectMapper; @Service @Profile("install") @@ -81,6 +91,10 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 3.0.1 to 3.1.0 ..."); tenantsEntityViewsUpdater.updateEntities(null); break; + case "3.1.1": + log.info("Updating data from version 3.1.1 to 3.2.0 ..."); + tenantsRootRuleChainUpdater.updateEntities(null); + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -107,6 +121,60 @@ public class DefaultDataUpdateService implements DataUpdateService { } }; + private PaginatedUpdater tenantsRootRuleChainUpdater = + new PaginatedUpdater() { + + @Override + protected PageData findEntities(String region, PageLink pageLink) { + return tenantService.findTenants(pageLink); + } + + @Override + protected void updateEntity(Tenant tenant) { + try { + RuleChain ruleChain = ruleChainService.getRootTenantRuleChain(tenant.getId()); + if (ruleChain == null) { + installScripts.createDefaultRuleChains(tenant.getId()); + } else { + RuleChainMetaData md = ruleChainService.loadRuleChainMetaData(tenant.getId(), ruleChain.getId()); + int oldIdx = md.getFirstNodeIndex(); + int newIdx = md.getNodes().size(); + + if (md.getNodes().size() < oldIdx) { + // Skip invalid rule chains + return; + } + + RuleNode oldFirstNode = md.getNodes().get(oldIdx); + if (oldFirstNode.getType().equals(TbDeviceProfileNode.class.getName())) { + // No need to update the rule node twice. + return; + } + + RuleNode ruleNode = new RuleNode(); + ruleNode.setRuleChainId(ruleChain.getId()); + ruleNode.setName("Device Profile Node"); + ruleNode.setType(TbDeviceProfileNode.class.getName()); + ruleNode.setDebugMode(false); + TbDeviceProfileNodeConfiguration ruleNodeConfiguration = new TbDeviceProfileNodeConfiguration().defaultConfiguration(); + ruleNode.setConfiguration(JacksonUtil.valueToTree(ruleNodeConfiguration)); + ObjectNode additionalInfo = JacksonUtil.newObjectNode(); + additionalInfo.put("description", "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type."); + additionalInfo.put("layoutX", 204); + additionalInfo.put("layoutY", 240); + ruleNode.setAdditionalInfo(additionalInfo); + + md.getNodes().add(ruleNode); + md.setFirstNodeIndex(newIdx); + md.addConnectionInfo(newIdx, oldIdx, "Success"); + ruleChainService.saveRuleChainMetaData(tenant.getId(), md); + } + } catch (Exception e) { + log.error("Unable to update Tenant", e); + } + } + }; + private PaginatedUpdater tenantsEntityViewsUpdater = new PaginatedUpdater() { @@ -121,30 +189,30 @@ public class DefaultDataUpdateService implements DataUpdateService { } }; - private void updateTenantEntityViews(TenantId tenantId) { - PageLink pageLink = new PageLink(100); - PageData pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink); - boolean hasNext = true; - while (hasNext) { - List>> updateFutures = new ArrayList<>(); - for (EntityView entityView : pageData.getData()) { - updateFutures.add(updateEntityViewLatestTelemetry(entityView)); - } + private void updateTenantEntityViews(TenantId tenantId) { + PageLink pageLink = new PageLink(100); + PageData pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink); + boolean hasNext = true; + while (hasNext) { + List>> updateFutures = new ArrayList<>(); + for (EntityView entityView : pageData.getData()) { + updateFutures.add(updateEntityViewLatestTelemetry(entityView)); + } - try { - Futures.allAsList(updateFutures).get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to copy latest telemetry to entity view", e); - } + try { + Futures.allAsList(updateFutures).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to copy latest telemetry to entity view", e); + } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink); - } else { - hasNext = false; - } - } - } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink); + } else { + hasNext = false; + } + } + } private ListenableFuture> updateEntityViewLatestTelemetry(EntityView entityView) { EntityViewId entityId = entityView.getId(); @@ -160,13 +228,13 @@ public class DefaultDataUpdateService implements DataUpdateService { keysFuture = Futures.immediateFuture(keys); } ListenableFuture> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> { - List queries = fetchKeys.stream().filter(key -> !isBlank(key)).map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); - if (!queries.isEmpty()) { - return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries); - } else { - return Futures.immediateFuture(null); - } - }, MoreExecutors.directExecutor()); + List queries = fetchKeys.stream().filter(key -> !isBlank(key)).map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); + if (!queries.isEmpty()) { + return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries); + } else { + return Futures.immediateFuture(null); + } + }, MoreExecutors.directExecutor()); return Futures.transformAsync(latestFuture, latestValues -> { if (latestValues != null && !latestValues.isEmpty()) { ListenableFuture> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 49392f52ed..232a534e88 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -467,9 +467,20 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { ctx.addStringParameter("where_relation_type", entityFilter.getRelationType()); whereFilter += " re.relation_type = :where_relation_type AND"; } + String toOrFrom = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from"); whereFilter += " re." + (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") + "_type = :where_entity_type"; if (entityFilter.isFetchLastLevelOnly()) { - whereFilter += " and re.lvl = " + entityFilter.getMaxLevel(); + String fromOrTo = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "from" : "to"); + StringBuilder notExistsPart = new StringBuilder(); + notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr where ") + .append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id") + .append(" and ") + .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type"); + if (!StringUtils.isEmpty(entityFilter.getRelationType())) { + notExistsPart.append(" and nr.relation_type = :where_relation_type"); + } + notExistsPart.append(")"); + whereFilter += " and ( re.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")"; } from = String.format(from, lvlFilter, whereFilter); String query = "( " + selectFields + from + ")"; @@ -502,14 +513,13 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { StringBuilder whereFilter = new StringBuilder(); boolean noConditions = true; + boolean single = entityFilter.getFilters() != null && entityFilter.getFilters().size() == 1; if (entityFilter.getFilters() != null && !entityFilter.getFilters().isEmpty()) { - boolean single = entityFilter.getFilters().size() == 1; int entityTypeFilterIdx = 0; for (EntityTypeFilter etf : entityFilter.getFilters()) { String etfCondition = buildEtfCondition(ctx, etf, entityFilter.getDirection(), entityTypeFilterIdx++); if (!etfCondition.isEmpty()) { if (noConditions) { - whereFilter.append(" WHERE "); noConditions = false; } else { whereFilter.append(" OR "); @@ -525,15 +535,33 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { } } if (noConditions) { - whereFilter.append(" WHERE re.") + whereFilter.append(" re.") .append(entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") .append("_type in (:where_entity_types").append(")"); ctx.addStringListParameter("where_entity_types", Arrays.stream(RELATION_QUERY_ENTITY_TYPES).map(EntityType::name).collect(Collectors.toList())); } - if (entityFilter.isFetchLastLevelOnly()) { - whereFilter.append(" and re.lvl = ").append(entityFilter.getMaxLevel()); + + if (!noConditions && !single) { + whereFilter = new StringBuilder().append("(").append(whereFilter).append(")"); } - from = String.format(from, lvlFilter, whereFilter); + + if (entityFilter.isFetchLastLevelOnly()) { + String toOrFrom = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from"); + String fromOrTo = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "from" : "to"); + + StringBuilder notExistsPart = new StringBuilder(); + notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr WHERE "); + notExistsPart.append(whereFilter.toString()); + notExistsPart + .append(" and ") + .append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id") + .append(" and ") + .append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type"); + + notExistsPart.append(")"); + whereFilter.append(" and ( re.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")"); + } + from = String.format(from, lvlFilter, " WHERE " + whereFilter); return "( " + selectFields + from + ")"; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java b/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java index 6b73b8c6b2..907a17157b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java @@ -18,7 +18,7 @@ package org.thingsboard.server.dao.util.mapping; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.thingsboard.server.common.data.alarm.Alarm; +import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; @@ -66,12 +66,16 @@ public class JacksonUtil { throw new IllegalArgumentException(e); } } + + public static ObjectNode newObjectNode(){ + return OBJECT_MAPPER.createObjectNode(); + } public static T clone(T value) { return fromString(toString(value), (Class) value.getClass()); } - public static JsonNode valueToTree(T alarm) { - return OBJECT_MAPPER.valueToTree(alarm); + public static JsonNode valueToTree(T value) { + return OBJECT_MAPPER.valueToTree(value); } }