Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2020-10-08 11:46:01 +03:00
commit c4b89e29ef
5 changed files with 160 additions and 40 deletions

View File

@ -8,7 +8,7 @@
"configuration": null
},
"metadata": {
"firstNodeIndex": 2,
"firstNodeIndex": 6,
"nodes": [
{
"additionalInfo": {
@ -82,9 +82,28 @@
"configuration": {
"timeoutInSeconds": 60
}
},
{
"additionalInfo": {
"description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.",
"layoutX": 204,
"layoutY": 240
},
"type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"name": "Device Profile Node",
"debugMode": false,
"configuration": {
"persistAlarmRulesState": false,
"fetchAlarmRulesStateOnStart": false
}
}
],
"connections": [
{
"fromIndex": 6,
"toIndex": 2,
"type": "Success"
},
{
"fromIndex": 2,
"toIndex": 4,

View File

@ -181,6 +181,7 @@ public class ThingsboardInstallService {
databaseTsUpgradeService.upgradeDatabase("3.1.1");
}
databaseEntitiesUpgradeService.upgradeDatabase("3.1.1");
dataUpdateService.updateData("3.1.1");
log.info("Updating system data...");
systemDataLoaderService.updateSystemWidgets();
break;

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.service.install.update;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -23,9 +25,13 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.profile.TbDeviceProfileNode;
import org.thingsboard.rule.engine.profile.TbDeviceProfileNodeConfiguration;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.SearchTextBased;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UUIDBased;
@ -35,10 +41,13 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import org.thingsboard.server.service.install.InstallScripts;
import javax.annotation.Nullable;
@ -49,6 +58,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.thingsboard.server.service.install.DatabaseHelper.objectMapper;
@Service
@Profile("install")
@ -81,6 +91,10 @@ public class DefaultDataUpdateService implements DataUpdateService {
log.info("Updating data from version 3.0.1 to 3.1.0 ...");
tenantsEntityViewsUpdater.updateEntities(null);
break;
case "3.1.1":
log.info("Updating data from version 3.1.1 to 3.2.0 ...");
tenantsRootRuleChainUpdater.updateEntities(null);
break;
default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
}
@ -107,6 +121,60 @@ public class DefaultDataUpdateService implements DataUpdateService {
}
};
private PaginatedUpdater<String, Tenant> tenantsRootRuleChainUpdater =
new PaginatedUpdater<String, Tenant>() {
@Override
protected PageData<Tenant> findEntities(String region, PageLink pageLink) {
return tenantService.findTenants(pageLink);
}
@Override
protected void updateEntity(Tenant tenant) {
try {
RuleChain ruleChain = ruleChainService.getRootTenantRuleChain(tenant.getId());
if (ruleChain == null) {
installScripts.createDefaultRuleChains(tenant.getId());
} else {
RuleChainMetaData md = ruleChainService.loadRuleChainMetaData(tenant.getId(), ruleChain.getId());
int oldIdx = md.getFirstNodeIndex();
int newIdx = md.getNodes().size();
if (md.getNodes().size() < oldIdx) {
// Skip invalid rule chains
return;
}
RuleNode oldFirstNode = md.getNodes().get(oldIdx);
if (oldFirstNode.getType().equals(TbDeviceProfileNode.class.getName())) {
// No need to update the rule node twice.
return;
}
RuleNode ruleNode = new RuleNode();
ruleNode.setRuleChainId(ruleChain.getId());
ruleNode.setName("Device Profile Node");
ruleNode.setType(TbDeviceProfileNode.class.getName());
ruleNode.setDebugMode(false);
TbDeviceProfileNodeConfiguration ruleNodeConfiguration = new TbDeviceProfileNodeConfiguration().defaultConfiguration();
ruleNode.setConfiguration(JacksonUtil.valueToTree(ruleNodeConfiguration));
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put("description", "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.");
additionalInfo.put("layoutX", 204);
additionalInfo.put("layoutY", 240);
ruleNode.setAdditionalInfo(additionalInfo);
md.getNodes().add(ruleNode);
md.setFirstNodeIndex(newIdx);
md.addConnectionInfo(newIdx, oldIdx, "Success");
ruleChainService.saveRuleChainMetaData(tenant.getId(), md);
}
} catch (Exception e) {
log.error("Unable to update Tenant", e);
}
}
};
private PaginatedUpdater<String, Tenant> tenantsEntityViewsUpdater =
new PaginatedUpdater<String, Tenant>() {
@ -121,30 +189,30 @@ public class DefaultDataUpdateService implements DataUpdateService {
}
};
private void updateTenantEntityViews(TenantId tenantId) {
PageLink pageLink = new PageLink(100);
PageData<EntityView> pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink);
boolean hasNext = true;
while (hasNext) {
List<ListenableFuture<List<Void>>> updateFutures = new ArrayList<>();
for (EntityView entityView : pageData.getData()) {
updateFutures.add(updateEntityViewLatestTelemetry(entityView));
}
private void updateTenantEntityViews(TenantId tenantId) {
PageLink pageLink = new PageLink(100);
PageData<EntityView> pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink);
boolean hasNext = true;
while (hasNext) {
List<ListenableFuture<List<Void>>> updateFutures = new ArrayList<>();
for (EntityView entityView : pageData.getData()) {
updateFutures.add(updateEntityViewLatestTelemetry(entityView));
}
try {
Futures.allAsList(updateFutures).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to copy latest telemetry to entity view", e);
}
try {
Futures.allAsList(updateFutures).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to copy latest telemetry to entity view", e);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink);
} else {
hasNext = false;
}
}
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
pageData = entityViewService.findEntityViewByTenantId(tenantId, pageLink);
} else {
hasNext = false;
}
}
}
private ListenableFuture<List<Void>> updateEntityViewLatestTelemetry(EntityView entityView) {
EntityViewId entityId = entityView.getId();
@ -160,13 +228,13 @@ public class DefaultDataUpdateService implements DataUpdateService {
keysFuture = Futures.immediateFuture(keys);
}
ListenableFuture<List<TsKvEntry>> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> {
List<ReadTsKvQuery> queries = fetchKeys.stream().filter(key -> !isBlank(key)).map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
if (!queries.isEmpty()) {
return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
List<ReadTsKvQuery> queries = fetchKeys.stream().filter(key -> !isBlank(key)).map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
if (!queries.isEmpty()) {
return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
} else {
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
return Futures.transformAsync(latestFuture, latestValues -> {
if (latestValues != null && !latestValues.isEmpty()) {
ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues);

View File

@ -467,9 +467,20 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
ctx.addStringParameter("where_relation_type", entityFilter.getRelationType());
whereFilter += " re.relation_type = :where_relation_type AND";
}
String toOrFrom = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from");
whereFilter += " re." + (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from") + "_type = :where_entity_type";
if (entityFilter.isFetchLastLevelOnly()) {
whereFilter += " and re.lvl = " + entityFilter.getMaxLevel();
String fromOrTo = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "from" : "to");
StringBuilder notExistsPart = new StringBuilder();
notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr where ")
.append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id")
.append(" and ")
.append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type");
if (!StringUtils.isEmpty(entityFilter.getRelationType())) {
notExistsPart.append(" and nr.relation_type = :where_relation_type");
}
notExistsPart.append(")");
whereFilter += " and ( re.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")";
}
from = String.format(from, lvlFilter, whereFilter);
String query = "( " + selectFields + from + ")";
@ -502,14 +513,13 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
StringBuilder whereFilter = new StringBuilder();
boolean noConditions = true;
boolean single = entityFilter.getFilters() != null && entityFilter.getFilters().size() == 1;
if (entityFilter.getFilters() != null && !entityFilter.getFilters().isEmpty()) {
boolean single = entityFilter.getFilters().size() == 1;
int entityTypeFilterIdx = 0;
for (EntityTypeFilter etf : entityFilter.getFilters()) {
String etfCondition = buildEtfCondition(ctx, etf, entityFilter.getDirection(), entityTypeFilterIdx++);
if (!etfCondition.isEmpty()) {
if (noConditions) {
whereFilter.append(" WHERE ");
noConditions = false;
} else {
whereFilter.append(" OR ");
@ -525,15 +535,33 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
}
}
if (noConditions) {
whereFilter.append(" WHERE re.")
whereFilter.append(" re.")
.append(entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from")
.append("_type in (:where_entity_types").append(")");
ctx.addStringListParameter("where_entity_types", Arrays.stream(RELATION_QUERY_ENTITY_TYPES).map(EntityType::name).collect(Collectors.toList()));
}
if (entityFilter.isFetchLastLevelOnly()) {
whereFilter.append(" and re.lvl = ").append(entityFilter.getMaxLevel());
if (!noConditions && !single) {
whereFilter = new StringBuilder().append("(").append(whereFilter).append(")");
}
from = String.format(from, lvlFilter, whereFilter);
if (entityFilter.isFetchLastLevelOnly()) {
String toOrFrom = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "to" : "from");
String fromOrTo = (entityFilter.getDirection().equals(EntitySearchDirection.FROM) ? "from" : "to");
StringBuilder notExistsPart = new StringBuilder();
notExistsPart.append(" NOT EXISTS (SELECT 1 from relation nr WHERE ");
notExistsPart.append(whereFilter.toString());
notExistsPart
.append(" and ")
.append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id")
.append(" and ")
.append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type");
notExistsPart.append(")");
whereFilter.append(" and ( re.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")");
}
from = String.format(from, lvlFilter, " WHERE " + whereFilter);
return "( " + selectFields + from + ")";
}

View File

@ -18,7 +18,7 @@ package org.thingsboard.server.dao.util.mapping;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.thingsboard.server.common.data.alarm.Alarm;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
@ -66,12 +66,16 @@ public class JacksonUtil {
throw new IllegalArgumentException(e);
}
}
public static ObjectNode newObjectNode(){
return OBJECT_MAPPER.createObjectNode();
}
public static <T> T clone(T value) {
return fromString(toString(value), (Class<T>) value.getClass());
}
public static <T> JsonNode valueToTree(T alarm) {
return OBJECT_MAPPER.valueToTree(alarm);
public static <T> JsonNode valueToTree(T value) {
return OBJECT_MAPPER.valueToTree(value);
}
}