add options fromMetadata
change logic node
This commit is contained in:
parent
fd9736b051
commit
f723c3c847
@ -32,59 +32,77 @@ import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@Slf4j
|
||||
@RuleNode(
|
||||
type = ComponentType.TRANSFORMATION,
|
||||
name = "copy from metadata to msg",
|
||||
configClazz = TbCopyFromMdToMsgNodeConfiguration.class,
|
||||
nodeDescription = "Copies the msg metadata keys to msg data with specified key names selected in the list",
|
||||
nodeDetails = "Will fetch fields values specified in list. If specified field is not part of msg metadata fields it will be ignored." +
|
||||
name = "copy keys",
|
||||
configClazz = TbCopyKeysNodeConfiguration.class,
|
||||
nodeDescription = "Copies the msg or metadata keys with specified key names selected in the list",
|
||||
nodeDetails = "Will fetch fields values specified in list. If specified field is not part of msg or metadata fields it will be ignored." +
|
||||
"If the msg is not a JSON object returns the incoming message as outbound message with <code>Failure</code> chain, " +
|
||||
"otherwise returns transformed messages via <code>Success</code> chain",
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js"},
|
||||
configDirective = "tbTransformationNodeCopyFromMdToMsgConfig",
|
||||
icon = "functions"
|
||||
configDirective = "tbTransformationNodeCopyKeysConfig",
|
||||
icon = "content_copy"
|
||||
)
|
||||
public class TbCopyFromMdToMsgNode implements TbNode {
|
||||
public class TbCopyKeysNode implements TbNode {
|
||||
|
||||
TbCopyFromMdToMsgNodeConfiguration config;
|
||||
TbCopyKeysNodeConfiguration config;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbCopyFromMdToMsgNodeConfiguration.class);
|
||||
this.config = TbNodeUtils.convert(configuration, TbCopyKeysNodeConfiguration.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
|
||||
processCopy(ctx, msg);
|
||||
List<String> keys = config.getKeys();
|
||||
if (CollectionUtils.isEmpty(keys)) {
|
||||
ctx.tellSuccess(msg);
|
||||
} else {
|
||||
TbMsgMetaData metaData = msg.getMetaData();
|
||||
String msgData = msg.getData();
|
||||
JsonNode dataNode = JacksonUtil.toJsonNode(msgData);
|
||||
if (!dataNode.isObject()) {
|
||||
ctx.tellFailure(msg, new RuntimeException("Msg data is not a JSON Object!"));
|
||||
return;
|
||||
}
|
||||
if (config.isFromMetadata()) {
|
||||
ObjectNode msgDataNode = (ObjectNode) dataNode;
|
||||
Map<String, String> metaDataMap = metaData.getData();
|
||||
keys.forEach(key -> {
|
||||
Pattern pattern = Pattern.compile(key);
|
||||
metaDataMap.forEach((keyMetaData, valueMetaData) -> {
|
||||
if (pattern.matcher(keyMetaData).matches()) {
|
||||
if (!StringUtils.isEmpty(valueMetaData)) {
|
||||
msgDataNode.put(keyMetaData, valueMetaData);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
msgData = JacksonUtil.toString(msgDataNode);
|
||||
} else {
|
||||
keys.forEach(key -> {
|
||||
Pattern pattern = Pattern.compile(key);
|
||||
dataNode.fields().forEachRemaining(entry -> {
|
||||
String keyData = entry.getKey();
|
||||
if (pattern.matcher(keyData).matches()) {
|
||||
metaData.putValue(keyData, String.valueOf(entry.getValue()));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), metaData, msgData));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
private void processCopy(TbContext ctx, TbMsg msg) {
|
||||
List<String> metadataMsgKeys = config.getMetadataMsgKeys();
|
||||
if (CollectionUtils.isEmpty(metadataMsgKeys)) {
|
||||
ctx.tellSuccess(msg);
|
||||
} else {
|
||||
JsonNode dataNode = JacksonUtil.toJsonNode(msg.getData());
|
||||
if (dataNode.isObject()) {
|
||||
ObjectNode msgData = (ObjectNode) dataNode;
|
||||
TbMsgMetaData metaData = msg.getMetaData();
|
||||
metadataMsgKeys.forEach(metadataKey -> {
|
||||
String value = metaData.getValue(metadataKey);
|
||||
if (!StringUtils.isEmpty(value)) {
|
||||
msgData.put(metadataKey, value);
|
||||
}
|
||||
});
|
||||
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), metaData, JacksonUtil.toString(msgData)));
|
||||
} else {
|
||||
ctx.tellFailure(msg, new RuntimeException("Msg data is not a JSON Object!"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,14 +22,16 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
public class TbCopyFromMdToMsgNodeConfiguration implements NodeConfiguration<TbCopyFromMdToMsgNodeConfiguration> {
|
||||
public class TbCopyKeysNodeConfiguration implements NodeConfiguration<TbCopyKeysNodeConfiguration> {
|
||||
|
||||
private List<String> metadataMsgKeys;
|
||||
private boolean fromMetadata;
|
||||
private List<String> keys;
|
||||
|
||||
@Override
|
||||
public TbCopyFromMdToMsgNodeConfiguration defaultConfiguration() {
|
||||
TbCopyFromMdToMsgNodeConfiguration configuration = new TbCopyFromMdToMsgNodeConfiguration();
|
||||
configuration.setMetadataMsgKeys(Collections.emptyList());
|
||||
public TbCopyKeysNodeConfiguration defaultConfiguration() {
|
||||
TbCopyKeysNodeConfiguration configuration = new TbCopyKeysNodeConfiguration();
|
||||
configuration.setKeys(Collections.emptyList());
|
||||
configuration.setFromMetadata(false);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
@ -44,12 +44,12 @@ import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TbCopyFromMdToMsgNodeTest {
|
||||
public class TbCopyKeysNodeTest {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
DeviceId deviceId;
|
||||
TbCopyFromMdToMsgNode node;
|
||||
TbCopyFromMdToMsgNodeConfiguration config;
|
||||
TbCopyKeysNode node;
|
||||
TbCopyKeysNodeConfiguration config;
|
||||
TbNodeConfiguration nodeConfiguration;
|
||||
TbContext ctx;
|
||||
TbMsgCallback callback;
|
||||
@ -59,10 +59,11 @@ public class TbCopyFromMdToMsgNodeTest {
|
||||
deviceId = new DeviceId(UUID.randomUUID());
|
||||
callback = mock(TbMsgCallback.class);
|
||||
ctx = mock(TbContext.class);
|
||||
config = new TbCopyFromMdToMsgNodeConfiguration().defaultConfiguration();
|
||||
config.setMetadataMsgKeys(List.of("TestKey_1", "TestKey_2", "TestKey_3"));
|
||||
config = new TbCopyKeysNodeConfiguration().defaultConfiguration();
|
||||
config.setKeys(List.of("TestKey_1", "TestKey_2", "TestKey_3", "(\\w*)Data(\\w*)"));
|
||||
config.setFromMetadata(true);
|
||||
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
||||
node = spy(new TbCopyFromMdToMsgNode());
|
||||
node = spy(new TbCopyKeysNode());
|
||||
node.init(ctx, nodeConfiguration);
|
||||
}
|
||||
|
||||
@ -78,12 +79,13 @@ public class TbCopyFromMdToMsgNodeTest {
|
||||
|
||||
@Test
|
||||
void givenDefaultConfig_whenVerify_thenOK() {
|
||||
TbCopyFromMdToMsgNodeConfiguration defaultConfig = new TbCopyFromMdToMsgNodeConfiguration().defaultConfiguration();
|
||||
assertThat(defaultConfig.getMetadataMsgKeys()).isEqualTo(Collections.emptyList());
|
||||
TbCopyKeysNodeConfiguration defaultConfig = new TbCopyKeysNodeConfiguration().defaultConfiguration();
|
||||
assertThat(defaultConfig.getKeys()).isEqualTo(Collections.emptyList());
|
||||
assertThat(defaultConfig.isFromMetadata()).isEqualTo(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenMsg_whenOnMsg_thenVerifyOutput() throws Exception {
|
||||
void givenMsgFromMetadata_whenOnMsg_thenVerifyOutput() throws Exception {
|
||||
String data = "{}";
|
||||
node.onMsg(ctx, getTbMsg(deviceId, data));
|
||||
|
||||
@ -96,11 +98,33 @@ public class TbCopyFromMdToMsgNodeTest {
|
||||
|
||||
JsonNode dataNode = JacksonUtil.toJsonNode(newMsg.getData());
|
||||
assertThat(dataNode.has("TestKey_1")).isEqualTo(true);
|
||||
assertThat(dataNode.has("voltageDataValue")).isEqualTo(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenMsgFromMsg_whenOnMsg_thenVerifyOutput() throws Exception {
|
||||
config.setFromMetadata(false);
|
||||
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
String data = "{\"DigitData\":22.5,\"TempDataValue\":10.5}";
|
||||
node.onMsg(ctx, getTbMsg(deviceId, data));
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture());
|
||||
verify(ctx, never()).tellFailure(any(), any());
|
||||
|
||||
TbMsg newMsg = newMsgCaptor.getValue();
|
||||
assertThat(newMsg).isNotNull();
|
||||
|
||||
Map<String, String> metaDataMap = newMsg.getMetaData().getData();
|
||||
assertThat(metaDataMap.containsKey("DigitData")).isEqualTo(true);
|
||||
assertThat(metaDataMap.containsKey("TempDataValue")).isEqualTo(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenEmptyKeys_whenOnMsg_thenVerifyOutput() throws Exception {
|
||||
TbCopyFromMdToMsgNodeConfiguration defaultConfig = new TbCopyFromMdToMsgNodeConfiguration().defaultConfiguration();
|
||||
TbCopyKeysNodeConfiguration defaultConfig = new TbCopyKeysNodeConfiguration().defaultConfiguration();
|
||||
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(defaultConfig));
|
||||
node.init(ctx, nodeConfiguration);
|
||||
|
||||
@ -134,6 +158,7 @@ public class TbCopyFromMdToMsgNodeTest {
|
||||
final Map<String, String> mdMap = Map.of(
|
||||
"TestKey_1", "Test",
|
||||
"country", "US",
|
||||
"voltageDataValue", "220",
|
||||
"city", "NY"
|
||||
);
|
||||
return TbMsg.newMsg("POST_ATTRIBUTES_REQUEST", entityId, new TbMsgMetaData(mdMap), data, callback);
|
||||
Loading…
x
Reference in New Issue
Block a user