Merge remote-tracking branch 'origin/master' into feature/check_tenant_entity_for_nodes

This commit is contained in:
Andrii Shvaika 2022-11-04 15:07:01 +02:00
commit 1e2abbcedf
9 changed files with 506 additions and 117 deletions

View File

@ -15,7 +15,9 @@
*/
package org.thingsboard.common.util;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonWriteFeature;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MapperFeature;
@ -46,6 +48,10 @@ public class JacksonUtil {
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
.build();
public static ObjectMapper ALLOW_UNQUOTED_FIELD_NAMES_MAPPER = JsonMapper.builder()
.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false)
.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
.build();
public static <T> T convertValue(Object fromValue, Class<T> toValueType) {
try {
@ -127,18 +133,26 @@ public class JacksonUtil {
}
public static JsonNode toJsonNode(String value) {
return toJsonNode(value, OBJECT_MAPPER);
}
public static JsonNode toJsonNode(String value, ObjectMapper mapper) {
if (value == null || value.isEmpty()) {
return null;
}
try {
return OBJECT_MAPPER.readTree(value);
return mapper.readTree(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
public static ObjectNode newObjectNode() {
return OBJECT_MAPPER.createObjectNode();
return newObjectNode(OBJECT_MAPPER);
}
public static ObjectNode newObjectNode(ObjectMapper mapper) {
return mapper.createObjectNode();
}
public static <T> T clone(T value) {
@ -216,18 +230,27 @@ public class JacksonUtil {
}
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) {
addKvEntry(entityNode, kvEntry, kvEntry.getKey());
}
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry, String key) {
addKvEntry(entityNode, kvEntry, key, OBJECT_MAPPER);
}
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry, String key, ObjectMapper mapper) {
if (kvEntry.getDataType() == DataType.BOOLEAN) {
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(key, value));
} else if (kvEntry.getDataType() == DataType.DOUBLE) {
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(key, value));
} else if (kvEntry.getDataType() == DataType.LONG) {
kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
kvEntry.getLongValue().ifPresent(value -> entityNode.put(key, value));
} else if (kvEntry.getDataType() == DataType.JSON) {
if (kvEntry.getJsonValue().isPresent()) {
entityNode.set(kvEntry.getKey(), JacksonUtil.toJsonNode(kvEntry.getJsonValue().get()));
entityNode.set(key, toJsonNode(kvEntry.getJsonValue().get(), mapper));
}
} else {
entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString());
entityNode.put(key, kvEntry.getValueAsString());
}
}
}

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Test;
public class JacksonUtilTest {
@Test
public void allow_unquoted_field_mapper_test() {
String data = "{data: 123}";
JsonNode actualResult = JacksonUtil.toJsonNode(data, JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); // should be: {"data": 123}
ObjectNode expectedResult = JacksonUtil.newObjectNode();
expectedResult.put("data", 123); // {"data": 123}
Assert.assertEquals(expectedResult, actualResult);
Assert.assertThrows(IllegalArgumentException.class, () -> JacksonUtil.toJsonNode(data)); // syntax exception due to missing quotes in the field name!
}
}

View File

@ -15,16 +15,15 @@
*/
package org.thingsboard.rule.engine.metadata;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.json.JsonWriteFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
@ -32,13 +31,18 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ -51,18 +55,20 @@ import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
private static ObjectMapper mapper = new ObjectMapper();
private static final String VALUE = "value";
private static final String TS = "ts";
protected C config;
private boolean fetchToData;
private boolean isTellFailureIfAbsent;
private boolean getLatestValueWithTs;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = loadGetAttributesNodeConfig(configuration);
mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
this.fetchToData = config.isFetchToData();
this.getLatestValueWithTs = config.isGetLatestValueWithTs();
this.isTellFailureIfAbsent = BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true);
}
protected abstract C loadGetAttributesNodeConfig(TbNodeConfiguration configuration) throws TbNodeException;
@ -86,97 +92,110 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
ctx.tellNext(msg, FAILURE);
return;
}
JsonNode msgDataNode;
if (fetchToData) {
msgDataNode = JacksonUtil.toJsonNode(msg.getData());
if (!msgDataNode.isObject()) {
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is not an object!"));
return;
}
} else {
msgDataNode = null;
}
ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
ListenableFuture<List<Void>> allFutures = Futures.allAsList(
putLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
putAttrAsync(ctx, entityId, msg, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap, "cs_"),
putAttrAsync(ctx, entityId, msg, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap, "shared_"),
putAttrAsync(ctx, entityId, msg, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap, "ss_")
ListenableFuture<List<Map<String, ? extends List<? extends KvEntry>>>> allFutures = Futures.allAsList(
getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap),
getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap),
getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap)
);
withCallback(allFutures, i -> {
withCallback(allFutures, futuresList -> {
if (!failuresMap.isEmpty()) {
throw reportFailures(failuresMap);
}
ctx.tellSuccess(msg);
TbMsgMetaData msgMetaData = msg.getMetaData().copy();
futuresList.stream().filter(Objects::nonNull).forEach(kvEntriesMap -> {
kvEntriesMap.forEach((keyScope, kvEntryList) -> {
String prefix = getPrefix(keyScope);
kvEntryList.forEach(kvEntry -> {
String key = prefix + kvEntry.getKey();
if (fetchToData) {
JacksonUtil.addKvEntry((ObjectNode) msgDataNode, kvEntry, key);
} else {
msgMetaData.putValue(key, kvEntry.getValueAsString());
}
});
});
});
if (fetchToData) {
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msgMetaData, JacksonUtil.toString(msgDataNode)));
} else {
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msgMetaData, msg.getData()));
}
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
}
private ListenableFuture<Void> putAttrAsync(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap, String prefix) {
private ListenableFuture<Map<String, List<AttributeKvEntry>>> getAttrAsync(TbContext ctx, EntityId entityId, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
if (CollectionUtils.isEmpty(keys)) {
return Futures.immediateFuture(null);
}
ListenableFuture<List<AttributeKvEntry>> attributeKvEntryListFuture = ctx.getAttributesService().find(ctx.getTenantId(), entityId, scope, keys);
return Futures.transform(attributeKvEntryListFuture, attributeKvEntryList -> {
if (!CollectionUtils.isEmpty(attributeKvEntryList)) {
List<AttributeKvEntry> existingAttributesKvEntry = attributeKvEntryList.stream().filter(attributeKvEntry -> keys.contains(attributeKvEntry.getKey())).collect(Collectors.toList());
existingAttributesKvEntry.forEach(kvEntry -> msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()));
if (existingAttributesKvEntry.size() != keys.size() && BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) {
getNotExistingKeys(existingAttributesKvEntry, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key));
}
} else {
if (BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) {
keys.forEach(key -> computeFailuresMap(scope, failuresMap, key));
}
if (isTellFailureIfAbsent && attributeKvEntryList.size() != keys.size()) {
getNotExistingKeys(attributeKvEntryList, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key));
}
return null;
Map<String, List<AttributeKvEntry>> mapAttributeKvEntry = new HashMap<>();
mapAttributeKvEntry.put(scope, attributeKvEntryList);
return mapAttributeKvEntry;
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
private ListenableFuture<Map<String, List<TsKvEntry>>> getLatestTelemetry(TbContext ctx, EntityId entityId, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
if (CollectionUtils.isEmpty(keys)) {
return Futures.immediateFuture(null);
}
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, keys);
return Futures.transform(latest, l -> {
l.forEach(r -> {
boolean getLatestValueWithTs = BooleanUtils.toBooleanDefaultIfNull(this.config.isGetLatestValueWithTs(), false);
if (BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true)) {
if (r.getValue() == null) {
computeFailuresMap(scope, failuresMap, r.getKey());
} else if (getLatestValueWithTs) {
putValueWithTs(msg, r);
} else {
msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
ListenableFuture<List<TsKvEntry>> latestTelemetryFutures = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, keys);
return Futures.transform(latestTelemetryFutures, tsKvEntries -> {
List<TsKvEntry> listTsKvEntry = new ArrayList<>();
tsKvEntries.forEach(tsKvEntry -> {
if (tsKvEntry.getValue() == null) {
if (isTellFailureIfAbsent) {
computeFailuresMap(LATEST_TS, failuresMap, tsKvEntry.getKey());
}
} else if (getLatestValueWithTs) {
listTsKvEntry.add(getValueWithTs(tsKvEntry));
} else {
if (r.getValue() != null) {
if (getLatestValueWithTs) {
putValueWithTs(msg, r);
} else {
msg.getMetaData().putValue(r.getKey(), r.getValueAsString());
}
}
listTsKvEntry.add(new BasicTsKvEntry(tsKvEntry.getTs(), tsKvEntry));
}
});
return null;
Map<String, List<TsKvEntry>> mapTsKvEntry = new HashMap<>();
mapTsKvEntry.put(LATEST_TS, listTsKvEntry);
return mapTsKvEntry;
}, MoreExecutors.directExecutor());
}
private void putValueWithTs(TbMsg msg, TsKvEntry r) {
ObjectNode value = mapper.createObjectNode();
value.put(TS, r.getTs());
switch (r.getDataType()) {
case STRING:
value.put(VALUE, r.getValueAsString());
private TsKvEntry getValueWithTs(TsKvEntry tsKvEntry) {
ObjectMapper mapper = fetchToData ? JacksonUtil.OBJECT_MAPPER : JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER;
ObjectNode value = JacksonUtil.newObjectNode(mapper);
value.put(TS, tsKvEntry.getTs());
JacksonUtil.addKvEntry(value, tsKvEntry, VALUE, mapper);
return new BasicTsKvEntry(tsKvEntry.getTs(), new JsonDataEntry(tsKvEntry.getKey(), value.toString()));
}
private String getPrefix(String scope) {
String prefix = "";
switch (scope) {
case CLIENT_SCOPE:
prefix = "cs_";
break;
case LONG:
value.put(VALUE, r.getLongValue().get());
case SHARED_SCOPE:
prefix = "shared_";
break;
case BOOLEAN:
value.put(VALUE, r.getBooleanValue().get());
break;
case DOUBLE:
value.put(VALUE, r.getDoubleValue().get());
break;
case JSON:
try {
value.set(VALUE, mapper.readTree(r.getJsonValue().get()));
} catch (IOException e) {
throw new JsonParseException("Can't parse jsonValue: " + r.getJsonValue().get(), e);
}
case SERVER_SCOPE:
prefix = "ss_";
break;
}
msg.getMetaData().putValue(r.getKey(), value.toString());
return prefix;
}
private List<String> getNotExistingKeys(List<AttributeKvEntry> existingAttributesKvEntry, List<String> allKeys) {

View File

@ -34,9 +34,9 @@ import org.thingsboard.server.common.msg.TbMsg;
@RuleNode(type = ComponentType.ENRICHMENT,
name = "originator attributes",
configClazz = TbGetAttributesNodeConfiguration.class,
nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Metadata",
nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " +
"with specific prefix: <i>cs/shared/ss</i>. Latest telemetry value added into metadata without prefix. " +
nodeDescription = "Add Message Originator Attributes or Latest Telemetry into Message Data or Metadata",
nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message data/metadata " +
"with specific prefix: <i>cs/shared/ss</i>. Latest telemetry value added into Message data/metadata without prefix. " +
"To access those attributes in other nodes this template can be used " +
"<code>metadata.cs_temperature</code> or <code>metadata.shared_limit</code> ",
uiResources = {"static/rulenode/rulenode-core-config.js"},

View File

@ -35,6 +35,7 @@ public class TbGetAttributesNodeConfiguration implements NodeConfiguration<TbGet
private boolean tellFailureIfAbsent;
private boolean getLatestValueWithTs;
private boolean fetchToData;
@Override
public TbGetAttributesNodeConfiguration defaultConfiguration() {
@ -45,6 +46,7 @@ public class TbGetAttributesNodeConfiguration implements NodeConfiguration<TbGet
configuration.setLatestTsKeyNames(Collections.emptyList());
configuration.setTellFailureIfAbsent(true);
configuration.setGetLatestValueWithTs(false);
configuration.setFetchToData(false);
return configuration;
}
}

View File

@ -31,9 +31,9 @@ import org.thingsboard.server.common.msg.TbMsg;
@RuleNode(type = ComponentType.ENRICHMENT,
name = "related device attributes",
configClazz = TbGetDeviceAttrNodeConfiguration.class,
nodeDescription = "Add Originators Related Device Attributes and Latest Telemetry value into Message Metadata",
nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message metadata " +
"with specific prefix: <i>cs/shared/ss</i>. Latest telemetry value added into metadata without prefix. " +
nodeDescription = "Add Originators Related Device Attributes and Latest Telemetry value into Message Data or Metadata",
nodeDetails = "If Attributes enrichment configured, <b>CLIENT/SHARED/SERVER</b> attributes are added into Message data/metadata " +
"with specific prefix: <i>cs/shared/ss</i>. Latest telemetry value added into Message data/metadata without prefix. " +
"To access those attributes in other nodes this template can be used " +
"<code>metadata.cs_temperature</code> or <code>metadata.shared_limit</code> ",
uiResources = {"static/rulenode/rulenode-core-config.js"},

View File

@ -36,6 +36,7 @@ public class TbGetDeviceAttrNodeConfiguration extends TbGetAttributesNodeConfigu
configuration.setLatestTsKeyNames(Collections.emptyList());
configuration.setTellFailureIfAbsent(true);
configuration.setGetLatestValueWithTs(false);
configuration.setFetchToData(false);
DeviceRelationsQuery deviceRelationsQuery = new DeviceRelationsQuery();
deviceRelationsQuery.setDirection(EntitySearchDirection.FROM);

View File

@ -15,25 +15,22 @@
*/
package org.thingsboard.rule.engine.metadata;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.json.JsonWriteFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.JsonParseException;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
@ -41,7 +38,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -75,7 +71,6 @@ public class TbGetTelemetryNode implements TbNode {
private TbGetTelemetryNodeConfiguration config;
private List<String> tsKeyNames;
private int limit;
private ObjectMapper mapper;
private String fetchMode;
private String orderByFetchAll;
private Aggregation aggregation;
@ -91,10 +86,6 @@ public class TbGetTelemetryNode implements TbNode {
orderByFetchAll = ASC_ORDER;
}
aggregation = parseAggregationConfig(config.getAggregation());
mapper = new ObjectMapper();
mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
}
Aggregation parseAggregationConfig(String aggName) {
@ -146,7 +137,7 @@ public class TbGetTelemetryNode implements TbNode {
}
private void process(List<TsKvEntry> entries, TbMsg msg, List<String> keys) {
ObjectNode resultNode = mapper.createObjectNode();
ObjectNode resultNode = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
if (FETCH_MODE_ALL.equals(fetchMode)) {
entries.forEach(entry -> processArray(resultNode, entry));
} else {
@ -169,36 +160,16 @@ public class TbGetTelemetryNode implements TbNode {
ArrayNode arrayNode = (ArrayNode) node.get(entry.getKey());
arrayNode.add(buildNode(entry));
} else {
ArrayNode arrayNode = mapper.createArrayNode();
ArrayNode arrayNode = JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER.createArrayNode();
arrayNode.add(buildNode(entry));
node.set(entry.getKey(), arrayNode);
}
}
private ObjectNode buildNode(TsKvEntry entry) {
ObjectNode obj = mapper.createObjectNode()
.put("ts", entry.getTs());
switch (entry.getDataType()) {
case STRING:
obj.put("value", entry.getValueAsString());
break;
case LONG:
obj.put("value", entry.getLongValue().get());
break;
case BOOLEAN:
obj.put("value", entry.getBooleanValue().get());
break;
case DOUBLE:
obj.put("value", entry.getDoubleValue().get());
break;
case JSON:
try {
obj.set("value", mapper.readTree(entry.getJsonValue().get()));
} catch (IOException e) {
throw new JsonParseException("Can't parse jsonValue: " + entry.getJsonValue().get(), e);
}
break;
}
ObjectNode obj = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
obj.put("ts", entry.getTs());
JacksonUtil.addKvEntry(obj, entry, "value", JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
return obj;
}

View File

@ -0,0 +1,338 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.metadata;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
@RunWith(MockitoJUnitRunner.class)
public class TbAbstractGetAttributesNodeTest {
final ObjectMapper mapper = new ObjectMapper();
private EntityId originator = new DeviceId(Uuids.timeBased());
private TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
@Mock
private TbContext ctx;
@Mock
private AttributesService attributesService;
@Mock
private TimeseriesService tsService;
private AbstractListeningExecutor dbExecutor;
private List<String> clientAttributes;
private List<String> serverAttributes;
private List<String> sharedAttributes;
private List<String> tsKeys;
private long ts;
@Before
public void before() throws TbNodeException {
dbExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 3;
}
};
dbExecutor.init();
Mockito.reset(ctx);
Mockito.reset(attributesService);
Mockito.reset(tsService);
Mockito.reset(ctx);
Mockito.reset(attributesService);
Mockito.reset(tsService);
lenient().when(ctx.getAttributesService()).thenReturn(attributesService);
lenient().when(ctx.getTimeseriesService()).thenReturn(tsService);
lenient().when(ctx.getTenantId()).thenReturn(tenantId);
lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
clientAttributes = getAttributeNames("client");
serverAttributes = getAttributeNames("server");
sharedAttributes = getAttributeNames("shared");
tsKeys = List.of("temperature", "humidity", "unknown");
ts = System.currentTimeMillis();
Mockito.when(attributesService.find(tenantId, originator, DataConstants.CLIENT_SCOPE, clientAttributes))
.thenReturn(Futures.immediateFuture(getListAttributeKvEntry(clientAttributes, ts)));
Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, serverAttributes))
.thenReturn(Futures.immediateFuture(getListAttributeKvEntry(serverAttributes, ts)));
Mockito.when(attributesService.find(tenantId, originator, DataConstants.SHARED_SCOPE, sharedAttributes))
.thenReturn(Futures.immediateFuture(getListAttributeKvEntry(sharedAttributes, ts)));
Mockito.when(tsService.findLatest(tenantId, originator, tsKeys))
.thenReturn(Futures.immediateFuture(getListTsKvEntry(tsKeys, ts)));
}
@After
public void after() {
dbExecutor.destroy();
}
@Test
public void fetchToMetadata_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(false, false, false);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
TbMsg resultMsg = checkMsg();
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
//check attributes
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
//check timeseries
checkTs(tsKeys, false, false, msgMetaData, null);
}
@Test
public void fetchToMetadata_latestWithTs_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(false, true, false);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
TbMsg resultMsg = checkMsg();
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
//check attributes
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
//check timeseries with ts
checkTs(tsKeys, false, true, msgMetaData, null);
}
@Test
public void fetchToData_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(true, false, false);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
TbMsg resultMsg = checkMsg();
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
//check attributes
checkAttributes(clientAttributes, "cs_", true, null, msgData);
checkAttributes(serverAttributes, "ss_", true, null, msgData);
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
//check timeseries
checkTs(tsKeys, true, false, null, msgData);
}
@Test
public void fetchToData_latestWithTs_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(true, true, false);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
TbMsg resultMsg = checkMsg();
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
//check attributes
checkAttributes(clientAttributes, "cs_", true, null, msgData);
checkAttributes(serverAttributes, "ss_", true, null, msgData);
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
//check timeseries with ts
checkTs(tsKeys, true, true, null, msgData);
}
@Test
public void fetchToData_whenOnMsg_then_failure() throws Exception {
TbGetAttributesNode node = initNode(true, true, true);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Mockito.verify(ctx, never()).tellSuccess(any());
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture());
Assert.assertSame(newMsgCaptor.getValue(), msg);
Assert.assertNotNull(exceptionCaptor.getValue());
}
@Test
public void fetchToData_whenOnMsg_then_data_not_object_failure() throws Exception {
TbGetAttributesNode node = initNode(true, true, true);
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), "[]");
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Mockito.verify(ctx, never()).tellSuccess(any());
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture());
Assert.assertSame(newMsgCaptor.getValue(), msg);
Assert.assertNotNull(exceptionCaptor.getValue());
}
private TbMsg checkMsg() {
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
TbMsg resultMsg = msgCaptor.getValue();
Assert.assertNotNull(resultMsg);
Assert.assertNotNull(resultMsg.getMetaData());
Assert.assertNotNull(resultMsg.getData());
return resultMsg;
}
private void checkAttributes(List<String> attributes, String prefix, boolean fetchToData, TbMsgMetaData msgMetaData, JsonNode msgData) {
attributes.stream()
.filter(attribute -> !attribute.equals("unknown"))
.forEach(attribute -> {
String result;
if (fetchToData) {
result = msgData.get(prefix + attribute).asText();
} else {
result = msgMetaData.getValue(prefix + attribute);
}
Assert.assertNotNull(result);
Assert.assertEquals(attribute + "_value", result);
});
}
private void checkTs(List<String> tsKeys, boolean fetchToData, boolean getLatestValueWithTs, TbMsgMetaData msgMetaData, JsonNode msgData) {
long value = 1L;
for (String key : tsKeys) {
if (key.equals("unknown")) {
continue;
}
String actualValue;
String expectedValue;
if (getLatestValueWithTs) {
expectedValue = "{\"ts\":" + ts + ",\"value\":{\"data\":" + value + "}}";
} else {
expectedValue = "{\"data\":" + value + "}";
}
if (fetchToData) {
actualValue = JacksonUtil.toString(msgData.get(key));
} else {
actualValue = msgMetaData.getValue(key);
}
Assert.assertNotNull(actualValue);
Assert.assertEquals(expectedValue, actualValue);
value++;
}
}
private TbGetAttributesNode initNode(boolean fetchToData, boolean getLatestValueWithTs, boolean isTellFailureIfAbsent) throws TbNodeException {
TbGetAttributesNodeConfiguration config = new TbGetAttributesNodeConfiguration();
config.setClientAttributeNames(List.of("client_attr_1", "client_attr_2", "${client_attr_metadata}", "unknown"));
config.setServerAttributeNames(List.of("server_attr_1", "server_attr_2", "${server_attr_metadata}", "unknown"));
config.setSharedAttributeNames(List.of("shared_attr_1", "shared_attr_2", "$[shared_attr_data]", "unknown"));
config.setLatestTsKeyNames(List.of("temperature", "humidity", "unknown"));
config.setFetchToData(fetchToData);
config.setGetLatestValueWithTs(getLatestValueWithTs);
config.setTellFailureIfAbsent(isTellFailureIfAbsent);
TbNodeConfiguration nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
TbGetAttributesNode node = new TbGetAttributesNode();
node.init(ctx, nodeConfiguration);
return node;
}
private TbMsg getTbMsg(EntityId entityId) {
ObjectNode msgData = JacksonUtil.newObjectNode();
msgData.put("shared_attr_data", "shared_attr_3");
TbMsgMetaData msgMetaData = new TbMsgMetaData();
msgMetaData.putValue("client_attr_metadata", "client_attr_3");
msgMetaData.putValue("server_attr_metadata", "server_attr_3");
return TbMsg.newMsg("TEST", entityId, msgMetaData, msgData.toString());
}
private List<String> getAttributeNames(String prefix) {
return List.of(prefix + "_attr_1", prefix + "_attr_2", prefix + "_attr_3", "unknown");
}
private List<AttributeKvEntry> getListAttributeKvEntry(List<String> attributes, long ts) {
return attributes.stream()
.filter(attribute -> !attribute.equals("unknown"))
.map(attribute -> toAttributeKvEntry(ts, attribute))
.collect(Collectors.toList());
}
private BaseAttributeKvEntry toAttributeKvEntry(long ts, String attribute) {
return new BaseAttributeKvEntry(ts, new StringDataEntry(attribute, attribute + "_value"));
}
private List<TsKvEntry> getListTsKvEntry(List<String> keys, long ts) {
long value = 1L;
List<TsKvEntry> kvEntries = new ArrayList<>();
for (String key : keys) {
if (key.equals("unknown")) {
continue;
}
String dataValue = "{\"data\":" + value + "}";
kvEntries.add(new BasicTsKvEntry(ts, new JsonDataEntry(key, dataValue)));
value++;
}
return kvEntries;
}
}