Merge pull request #9 from ShvaykaD/feature/node_originator_attributes_fetch_to_msg_data

added JacksonUtilTest for ALLOW_UNQUOTED_FIELD_NAMES_MAPPER and updat…
This commit is contained in:
Yurii 2022-11-03 12:34:53 +02:00 committed by GitHub
commit 7be1cc74a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 38 deletions

View File

@ -229,6 +229,10 @@ public class JacksonUtil {
} }
} }
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) {
addKvEntry(entityNode, kvEntry, kvEntry.getKey());
}
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry, String key) { public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry, String key) {
addKvEntry(entityNode, kvEntry, key, OBJECT_MAPPER); addKvEntry(entityNode, kvEntry, key, OBJECT_MAPPER);
} }
@ -249,7 +253,4 @@ public class JacksonUtil {
} }
} }
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) {
addKvEntry(entityNode, kvEntry, kvEntry.getKey());
}
} }

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Assert;
import org.junit.Test;
public class JacksonUtilTest {
@Test
public void allow_unquoted_field_mapper_test() {
String data = "{data: 123}";
JsonNode actualResult = JacksonUtil.toJsonNode(data, JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); // should be: {"data": 123}
ObjectNode expectedResult = JacksonUtil.newObjectNode();
expectedResult.put("data", 123); // {"data": 123}
Assert.assertEquals(expectedResult, actualResult);
Assert.assertThrows(IllegalArgumentException.class, () -> JacksonUtil.toJsonNode(data)); // syntax exception due to missing quotes in the field name!
}
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.metadata; package org.thingsboard.rule.engine.metadata;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -91,16 +92,19 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
ctx.tellNext(msg, FAILURE); ctx.tellNext(msg, FAILURE);
return; return;
} }
JsonNode msgDataNode = JacksonUtil.toJsonNode(msg.getData(), JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); JsonNode msgDataNode;
if (fetchToData) { if (fetchToData) {
msgDataNode = JacksonUtil.toJsonNode(msg.getData());
if (!msgDataNode.isObject()) { if (!msgDataNode.isObject()) {
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is not an object!")); ctx.tellFailure(msg, new IllegalArgumentException("Msg body is not an object!"));
return; return;
} }
} else {
msgDataNode = null;
} }
ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>(); ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
ListenableFuture<List<Map<String, ? extends List<? extends KvEntry>>>> allFutures = Futures.allAsList( ListenableFuture<List<Map<String, ? extends List<? extends KvEntry>>>> allFutures = Futures.allAsList(
getLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap), getLatestTelemetry(ctx, entityId, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap), getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap),
getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap), getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap),
getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap) getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap)
@ -114,10 +118,11 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
kvEntriesMap.forEach((keyScope, kvEntryList) -> { kvEntriesMap.forEach((keyScope, kvEntryList) -> {
String prefix = getPrefix(keyScope); String prefix = getPrefix(keyScope);
kvEntryList.forEach(kvEntry -> { kvEntryList.forEach(kvEntry -> {
String key = prefix + kvEntry.getKey();
if (fetchToData) { if (fetchToData) {
JacksonUtil.addKvEntry((ObjectNode) msgDataNode, kvEntry, prefix + kvEntry.getKey(), JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); JacksonUtil.addKvEntry((ObjectNode) msgDataNode, kvEntry, key);
} else { } else {
msgMetaData.putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()); msgMetaData.putValue(key, kvEntry.getValueAsString());
} }
}); });
}); });
@ -145,7 +150,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
private ListenableFuture<Map<String, List<TsKvEntry>>> getLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) { private ListenableFuture<Map<String, List<TsKvEntry>>> getLatestTelemetry(TbContext ctx, EntityId entityId, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
if (CollectionUtils.isEmpty(keys)) { if (CollectionUtils.isEmpty(keys)) {
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
@ -155,7 +160,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
tsKvEntries.forEach(tsKvEntry -> { tsKvEntries.forEach(tsKvEntry -> {
if (tsKvEntry.getValue() == null) { if (tsKvEntry.getValue() == null) {
if (isTellFailureIfAbsent) { if (isTellFailureIfAbsent) {
computeFailuresMap(scope, failuresMap, tsKvEntry.getKey()); computeFailuresMap(LATEST_TS, failuresMap, tsKvEntry.getKey());
} }
} else if (getLatestValueWithTs) { } else if (getLatestValueWithTs) {
listTsKvEntry.add(getValueWithTs(tsKvEntry)); listTsKvEntry.add(getValueWithTs(tsKvEntry));
@ -164,7 +169,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
} }
}); });
Map<String, List<TsKvEntry>> mapTsKvEntry = new HashMap<>(); Map<String, List<TsKvEntry>> mapTsKvEntry = new HashMap<>();
mapTsKvEntry.put(scope, listTsKvEntry); mapTsKvEntry.put(LATEST_TS, listTsKvEntry);
return mapTsKvEntry; return mapTsKvEntry;
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
@ -172,7 +177,8 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
private TsKvEntry getValueWithTs(TsKvEntry tsKvEntry) { private TsKvEntry getValueWithTs(TsKvEntry tsKvEntry) {
ObjectNode value = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); ObjectNode value = JacksonUtil.newObjectNode(JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER);
value.put(TS, tsKvEntry.getTs()); value.put(TS, tsKvEntry.getTs());
JacksonUtil.addKvEntry(value, tsKvEntry, VALUE, JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER); ObjectMapper mapper = fetchToData ? JacksonUtil.OBJECT_MAPPER : JacksonUtil.ALLOW_UNQUOTED_FIELD_NAMES_MAPPER;
JacksonUtil.addKvEntry(value, tsKvEntry, VALUE, mapper);
return new BasicTsKvEntry(tsKvEntry.getTs(), new JsonDataEntry(tsKvEntry.getKey(), value.toString())); return new BasicTsKvEntry(tsKvEntry.getTs(), new JsonDataEntry(tsKvEntry.getKey(), value.toString()));
} }

View File

@ -41,7 +41,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -51,6 +51,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.lenient;
@ -76,6 +77,7 @@ public class TbAbstractGetAttributesNodeTest {
private List<String> serverAttributes; private List<String> serverAttributes;
private List<String> sharedAttributes; private List<String> sharedAttributes;
private List<String> tsKeys; private List<String> tsKeys;
private long ts;
@Before @Before
public void before() throws TbNodeException { public void before() throws TbNodeException {
@ -104,20 +106,21 @@ public class TbAbstractGetAttributesNodeTest {
serverAttributes = getAttributeNames("server"); serverAttributes = getAttributeNames("server");
sharedAttributes = getAttributeNames("shared"); sharedAttributes = getAttributeNames("shared");
tsKeys = List.of("temperature", "humidity", "unknown"); tsKeys = List.of("temperature", "humidity", "unknown");
ts = System.currentTimeMillis();
Mockito.when(attributesService.find(tenantId, originator, DataConstants.CLIENT_SCOPE, clientAttributes)) Mockito.when(attributesService.find(tenantId, originator, DataConstants.CLIENT_SCOPE, clientAttributes))
.thenReturn(Futures.immediateFuture(getListAttributeKvEntry(clientAttributes))); .thenReturn(Futures.immediateFuture(getListAttributeKvEntry(clientAttributes, ts)));
Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, serverAttributes)) Mockito.when(attributesService.find(tenantId, originator, DataConstants.SERVER_SCOPE, serverAttributes))
.thenReturn(Futures.immediateFuture(getListAttributeKvEntry(serverAttributes))); .thenReturn(Futures.immediateFuture(getListAttributeKvEntry(serverAttributes, ts)));
Mockito.when(attributesService.find(tenantId, originator, DataConstants.SHARED_SCOPE, sharedAttributes)) Mockito.when(attributesService.find(tenantId, originator, DataConstants.SHARED_SCOPE, sharedAttributes))
.thenReturn(Futures.immediateFuture(getListAttributeKvEntry(sharedAttributes))); .thenReturn(Futures.immediateFuture(getListAttributeKvEntry(sharedAttributes, ts)));
Mockito.when(tsService.findLatest(tenantId, originator, tsKeys)) Mockito.when(tsService.findLatest(tenantId, originator, tsKeys))
.thenReturn(Futures.immediateFuture(getListTelemetryKvEntry(tsKeys))); .thenReturn(Futures.immediateFuture(getListTsKvEntry(tsKeys, ts)));
} }
@After @After
@ -143,8 +146,44 @@ public class TbAbstractGetAttributesNodeTest {
checkTs(tsKeys, false, false, msgMetaData, null); checkTs(tsKeys, false, false, msgMetaData, null);
} }
@Test
public void fetchToMetadata_latestWithTs_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(false, true, false);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
TbMsg resultMsg = checkMsg();
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
//check attributes
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
//check timeseries with ts
checkTs(tsKeys, false, true, msgMetaData, null);
}
@Test @Test
public void fetchToData_whenOnMsg_then_success() throws Exception { public void fetchToData_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(true, false, false);
TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg);
TbMsg resultMsg = checkMsg();
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
//check attributes
checkAttributes(clientAttributes, "cs_", true, null, msgData);
checkAttributes(serverAttributes, "ss_", true, null, msgData);
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
//check timeseries
checkTs(tsKeys, true, false, null, msgData);
}
@Test
public void fetchToData_latestWithTs_whenOnMsg_then_success() throws Exception {
TbGetAttributesNode node = initNode(true, true, false); TbGetAttributesNode node = initNode(true, true, false);
TbMsg msg = getTbMsg(originator); TbMsg msg = getTbMsg(originator);
node.onMsg(ctx, msg); node.onMsg(ctx, msg);
@ -218,28 +257,26 @@ public class TbAbstractGetAttributesNodeTest {
} }
private void checkTs(List<String> tsKeys, boolean fetchToData, boolean getLatestValueWithTs, TbMsgMetaData msgMetaData, JsonNode msgData) { private void checkTs(List<String> tsKeys, boolean fetchToData, boolean getLatestValueWithTs, TbMsgMetaData msgMetaData, JsonNode msgData) {
long tsValue = 1L; long value = 1L;
for (String key : tsKeys) { for (String key : tsKeys) {
if (key.equals("unknown")) { if (key.equals("unknown")) {
continue; continue;
} }
String result; String actualValue;
if (fetchToData) { String expectedValue;
if (getLatestValueWithTs) { if (getLatestValueWithTs) {
JsonNode resultTs = msgData.get(key); expectedValue = "{\"ts\":" + ts + ",\"value\":{\"data\":" + value + "}}";
Assert.assertNotNull(resultTs);
Assert.assertNotNull(resultTs.get("value"));
Assert.assertNotNull(resultTs.get("ts"));
result = resultTs.get("value").asText();
} else { } else {
result = msgData.get(key).asText(); expectedValue = "{\"data\":" + value + "}";
} }
if (fetchToData) {
actualValue = JacksonUtil.toString(msgData.get(key));
} else { } else {
result = msgMetaData.getValue(key); actualValue = msgMetaData.getValue(key);
} }
Assert.assertNotNull(result); Assert.assertNotNull(actualValue);
Assert.assertEquals(String.valueOf(tsValue), result); Assert.assertEquals(expectedValue, actualValue);
tsValue++; value++;
} }
} }
@ -273,20 +310,26 @@ public class TbAbstractGetAttributesNodeTest {
return List.of(prefix + "_attr_1", prefix + "_attr_2", prefix + "_attr_3", "unknown"); return List.of(prefix + "_attr_1", prefix + "_attr_2", prefix + "_attr_3", "unknown");
} }
private List<AttributeKvEntry> getListAttributeKvEntry(List<String> attributes) { private List<AttributeKvEntry> getListAttributeKvEntry(List<String> attributes, long ts) {
List<AttributeKvEntry> attributeKvEntries = new ArrayList<>(); return attributes.stream()
attributes.stream().filter(attribute -> !attribute.equals("unknown")).forEach(attribute -> attributeKvEntries.add(new BaseAttributeKvEntry(System.currentTimeMillis(), new StringDataEntry(attribute, attribute + "_value")))); .filter(attribute -> !attribute.equals("unknown"))
return attributeKvEntries; .map(attribute -> toAttributeKvEntry(ts, attribute))
.collect(Collectors.toList());
} }
private List<TsKvEntry> getListTelemetryKvEntry(List<String> keys) { private BaseAttributeKvEntry toAttributeKvEntry(long ts, String attribute) {
return new BaseAttributeKvEntry(ts, new StringDataEntry(attribute, attribute + "_value"));
}
private List<TsKvEntry> getListTsKvEntry(List<String> keys, long ts) {
long value = 1L; long value = 1L;
List<TsKvEntry> kvEntries = new ArrayList<>(); List<TsKvEntry> kvEntries = new ArrayList<>();
for (String key : keys) { for (String key : keys) {
if (key.equals("unknown")) { if (key.equals("unknown")) {
continue; continue;
} }
kvEntries.add(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))); String dataValue = "{\"data\":" + value + "}";
kvEntries.add(new BasicTsKvEntry(ts, new JsonDataEntry(key, dataValue)));
value++; value++;
} }
return kvEntries; return kvEntries;