code review
This commit is contained in:
parent
ec96062547
commit
0ec08b8448
@ -215,23 +215,23 @@ public class JacksonUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry, String prefix) {
|
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry, String key) {
|
||||||
if (kvEntry.getDataType() == DataType.BOOLEAN) {
|
if (kvEntry.getDataType() == DataType.BOOLEAN) {
|
||||||
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(prefix + kvEntry.getKey(), value));
|
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(key, value));
|
||||||
} else if (kvEntry.getDataType() == DataType.DOUBLE) {
|
} else if (kvEntry.getDataType() == DataType.DOUBLE) {
|
||||||
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(prefix + kvEntry.getKey(), value));
|
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(key, value));
|
||||||
} else if (kvEntry.getDataType() == DataType.LONG) {
|
} else if (kvEntry.getDataType() == DataType.LONG) {
|
||||||
kvEntry.getLongValue().ifPresent(value -> entityNode.put(prefix + kvEntry.getKey(), value));
|
kvEntry.getLongValue().ifPresent(value -> entityNode.put(key, value));
|
||||||
} else if (kvEntry.getDataType() == DataType.JSON) {
|
} else if (kvEntry.getDataType() == DataType.JSON) {
|
||||||
if (kvEntry.getJsonValue().isPresent()) {
|
if (kvEntry.getJsonValue().isPresent()) {
|
||||||
entityNode.set(prefix + kvEntry.getKey(), JacksonUtil.valueToTree(kvEntry.getJsonValue().get()));
|
entityNode.set(key, JacksonUtil.valueToTree(kvEntry.getJsonValue().get()));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
entityNode.put(prefix + kvEntry.getKey(), kvEntry.getValueAsString());
|
entityNode.put(key, kvEntry.getValueAsString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) {
|
public static void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) {
|
||||||
addKvEntry(entityNode, kvEntry, "");
|
addKvEntry(entityNode, kvEntry, kvEntry.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,15 +15,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.metadata;
|
package org.thingsboard.rule.engine.metadata;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonParser;
|
|
||||||
import com.fasterxml.jackson.core.json.JsonWriteFeature;
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.gson.JsonParseException;
|
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang3.BooleanUtils;
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
@ -39,9 +35,7 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
|
|||||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -59,8 +53,6 @@ import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
|
|||||||
|
|
||||||
public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
|
public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
|
||||||
|
|
||||||
private static ObjectMapper mapper = new ObjectMapper();
|
|
||||||
|
|
||||||
private static final String VALUE = "value";
|
private static final String VALUE = "value";
|
||||||
private static final String TS = "ts";
|
private static final String TS = "ts";
|
||||||
|
|
||||||
@ -73,10 +65,8 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
this.config = loadGetAttributesNodeConfig(configuration);
|
this.config = loadGetAttributesNodeConfig(configuration);
|
||||||
this.fetchToData = config.isFetchToData();
|
this.fetchToData = config.isFetchToData();
|
||||||
|
this.getLatestValueWithTs = config.isGetLatestValueWithTs();
|
||||||
this.isTellFailureIfAbsent = BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true);
|
this.isTellFailureIfAbsent = BooleanUtils.toBooleanDefaultIfNull(this.config.isTellFailureIfAbsent(), true);
|
||||||
this.getLatestValueWithTs = BooleanUtils.toBooleanDefaultIfNull(this.config.isGetLatestValueWithTs(), false);
|
|
||||||
mapper.configure(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature(), false);
|
|
||||||
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract C loadGetAttributesNodeConfig(TbNodeConfiguration configuration) throws TbNodeException;
|
protected abstract C loadGetAttributesNodeConfig(TbNodeConfiguration configuration) throws TbNodeException;
|
||||||
@ -108,7 +98,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
|
ConcurrentHashMap<String, List<String>> failuresMap = new ConcurrentHashMap<>();
|
||||||
ListenableFuture<List<Map<String, List<? extends KvEntry>>>> allFutures = Futures.allAsList(
|
ListenableFuture<List<Map<String, ? extends List<? extends KvEntry>>>> allFutures = Futures.allAsList(
|
||||||
getLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
|
getLatestTelemetry(ctx, entityId, msg, LATEST_TS, TbNodeUtils.processPatterns(config.getLatestTsKeyNames(), msg), failuresMap),
|
||||||
getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap),
|
getAttrAsync(ctx, entityId, CLIENT_SCOPE, TbNodeUtils.processPatterns(config.getClientAttributeNames(), msg), failuresMap),
|
||||||
getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap),
|
getAttrAsync(ctx, entityId, SHARED_SCOPE, TbNodeUtils.processPatterns(config.getSharedAttributeNames(), msg), failuresMap),
|
||||||
@ -118,15 +108,14 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
if (!failuresMap.isEmpty()) {
|
if (!failuresMap.isEmpty()) {
|
||||||
throw reportFailures(failuresMap);
|
throw reportFailures(failuresMap);
|
||||||
}
|
}
|
||||||
TbMsgMetaData msgMetaData = msg.getMetaData();
|
|
||||||
futuresList.stream().filter(Objects::nonNull).forEach(kvEntriesMap -> {
|
futuresList.stream().filter(Objects::nonNull).forEach(kvEntriesMap -> {
|
||||||
kvEntriesMap.forEach((keyScope, kvEntryList) -> {
|
kvEntriesMap.forEach((keyScope, kvEntryList) -> {
|
||||||
String prefix = getPrefix(keyScope);
|
String prefix = getPrefix(keyScope);
|
||||||
kvEntryList.forEach(kvEntry -> {
|
kvEntryList.forEach(kvEntry -> {
|
||||||
if (fetchToData) {
|
if (fetchToData) {
|
||||||
JacksonUtil.addKvEntry((ObjectNode) msgDataNode, kvEntry, prefix);
|
JacksonUtil.addKvEntry((ObjectNode) msgDataNode, kvEntry, prefix + kvEntry.getKey());
|
||||||
} else {
|
} else {
|
||||||
msgMetaData.putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString());
|
msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -139,7 +128,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Map<String, List<? extends KvEntry>>> getAttrAsync(TbContext ctx, EntityId entityId, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
|
private ListenableFuture<Map<String, List<AttributeKvEntry>>> getAttrAsync(TbContext ctx, EntityId entityId, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
|
||||||
if (CollectionUtils.isEmpty(keys)) {
|
if (CollectionUtils.isEmpty(keys)) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
@ -148,13 +137,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
if (isTellFailureIfAbsent && attributeKvEntryList.size() != keys.size()) {
|
if (isTellFailureIfAbsent && attributeKvEntryList.size() != keys.size()) {
|
||||||
getNotExistingKeys(attributeKvEntryList, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key));
|
getNotExistingKeys(attributeKvEntryList, keys).forEach(key -> computeFailuresMap(scope, failuresMap, key));
|
||||||
}
|
}
|
||||||
Map<String, List<? extends KvEntry>> mapAttributeKvEntry = new HashMap<>();
|
Map<String, List<AttributeKvEntry>> mapAttributeKvEntry = new HashMap<>();
|
||||||
mapAttributeKvEntry.put(scope, attributeKvEntryList);
|
mapAttributeKvEntry.put(scope, attributeKvEntryList);
|
||||||
return mapAttributeKvEntry;
|
return mapAttributeKvEntry;
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Map<String, List<? extends KvEntry>>> getLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
|
private ListenableFuture<Map<String, List<TsKvEntry>>> getLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
|
||||||
if (CollectionUtils.isEmpty(keys)) {
|
if (CollectionUtils.isEmpty(keys)) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
@ -172,7 +161,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
listTsKvEntry.add(new BasicTsKvEntry(tsKvEntry.getTs(), tsKvEntry));
|
listTsKvEntry.add(new BasicTsKvEntry(tsKvEntry.getTs(), tsKvEntry));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Map<String, List<? extends KvEntry>> mapTsKvEntry = new HashMap<>();
|
Map<String, List<TsKvEntry>> mapTsKvEntry = new HashMap<>();
|
||||||
mapTsKvEntry.put(scope, listTsKvEntry);
|
mapTsKvEntry.put(scope, listTsKvEntry);
|
||||||
return mapTsKvEntry;
|
return mapTsKvEntry;
|
||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
@ -181,27 +170,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
|||||||
private TsKvEntry getValueWithTs(TsKvEntry tsKvEntry) {
|
private TsKvEntry getValueWithTs(TsKvEntry tsKvEntry) {
|
||||||
ObjectNode value = JacksonUtil.newObjectNode();
|
ObjectNode value = JacksonUtil.newObjectNode();
|
||||||
value.put(TS, tsKvEntry.getTs());
|
value.put(TS, tsKvEntry.getTs());
|
||||||
switch (tsKvEntry.getDataType()) {
|
JacksonUtil.addKvEntry(value, tsKvEntry, VALUE);
|
||||||
case STRING:
|
|
||||||
value.put(VALUE, tsKvEntry.getValueAsString());
|
|
||||||
break;
|
|
||||||
case LONG:
|
|
||||||
value.put(VALUE, tsKvEntry.getLongValue().get());
|
|
||||||
break;
|
|
||||||
case BOOLEAN:
|
|
||||||
value.put(VALUE, tsKvEntry.getBooleanValue().get());
|
|
||||||
break;
|
|
||||||
case DOUBLE:
|
|
||||||
value.put(VALUE, tsKvEntry.getDoubleValue().get());
|
|
||||||
break;
|
|
||||||
case JSON:
|
|
||||||
try {
|
|
||||||
value.set(VALUE, mapper.readTree(tsKvEntry.getJsonValue().get()));
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new JsonParseException("Can't parse jsonValue: " + tsKvEntry.getJsonValue().get(), e);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return new BasicTsKvEntry(tsKvEntry.getTs(), new JsonDataEntry(tsKvEntry.getKey(), value.toString()));
|
return new BasicTsKvEntry(tsKvEntry.getTs(), new JsonDataEntry(tsKvEntry.getKey(), value.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user