Merge pull request #8009 from ShvaykaD/bugfix/7998
[3.4.4] rollback "tellFailure" logic change in TbAbstractGetAttributesNode
This commit is contained in:
commit
9ff4a1d6e8
@ -110,9 +110,6 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
||||
getAttrAsync(ctx, entityId, SERVER_SCOPE, TbNodeUtils.processPatterns(config.getServerAttributeNames(), msg), failuresMap)
|
||||
);
|
||||
withCallback(allFutures, futuresList -> {
|
||||
if (!failuresMap.isEmpty()) {
|
||||
throw reportFailures(failuresMap);
|
||||
}
|
||||
TbMsgMetaData msgMetaData = msg.getMetaData().copy();
|
||||
futuresList.stream().filter(Objects::nonNull).forEach(kvEntriesMap -> {
|
||||
kvEntriesMap.forEach((keyScope, kvEntryList) -> {
|
||||
@ -127,10 +124,13 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
|
||||
});
|
||||
});
|
||||
});
|
||||
if (fetchToData) {
|
||||
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msgMetaData, JacksonUtil.toString(msgDataNode)));
|
||||
TbMsg outMsg = fetchToData ?
|
||||
TbMsg.transformMsgData(msg, JacksonUtil.toString(msgDataNode)) :
|
||||
TbMsg.transformMsg(msg, msgMetaData);
|
||||
if (failuresMap.isEmpty()) {
|
||||
ctx.tellSuccess(outMsg);
|
||||
} else {
|
||||
ctx.tellSuccess(TbMsg.transformMsg(msg, msg.getType(), msg.getOriginator(), msgMetaData, msg.getData()));
|
||||
ctx.tellFailure(outMsg, reportFailures(failuresMap));
|
||||
}
|
||||
}, t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
@ -134,16 +134,16 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
TbMsg msg = getTbMsg(originator);
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
TbMsg resultMsg = checkMsg();
|
||||
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
|
||||
// check msg
|
||||
TbMsg resultMsg = checkMsg(true);
|
||||
|
||||
//check attributes
|
||||
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
|
||||
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
|
||||
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
|
||||
checkAttributes(resultMsg, false, "cs_", clientAttributes);
|
||||
checkAttributes(resultMsg, false, "ss_", serverAttributes);
|
||||
checkAttributes(resultMsg, false, "shared_", sharedAttributes);
|
||||
|
||||
//check timeseries
|
||||
checkTs(tsKeys, false, false, msgMetaData, null);
|
||||
checkTs(resultMsg, false, false, tsKeys);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -152,16 +152,16 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
TbMsg msg = getTbMsg(originator);
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
TbMsg resultMsg = checkMsg();
|
||||
TbMsgMetaData msgMetaData = resultMsg.getMetaData();
|
||||
// check msg
|
||||
TbMsg resultMsg = checkMsg(true);
|
||||
|
||||
//check attributes
|
||||
checkAttributes(clientAttributes, "cs_", false, msgMetaData, null);
|
||||
checkAttributes(serverAttributes, "ss_", false, msgMetaData, null);
|
||||
checkAttributes(sharedAttributes, "shared_", false, msgMetaData, null);
|
||||
checkAttributes(resultMsg, false, "cs_", clientAttributes);
|
||||
checkAttributes(resultMsg, false, "ss_", serverAttributes);
|
||||
checkAttributes(resultMsg, false, "shared_", sharedAttributes);
|
||||
|
||||
//check timeseries with ts
|
||||
checkTs(tsKeys, false, true, msgMetaData, null);
|
||||
checkTs(resultMsg, false, true, tsKeys);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -170,16 +170,16 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
TbMsg msg = getTbMsg(originator);
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
TbMsg resultMsg = checkMsg();
|
||||
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
|
||||
// check msg
|
||||
TbMsg resultMsg = checkMsg(true);
|
||||
|
||||
//check attributes
|
||||
checkAttributes(clientAttributes, "cs_", true, null, msgData);
|
||||
checkAttributes(serverAttributes, "ss_", true, null, msgData);
|
||||
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
|
||||
checkAttributes(resultMsg, true, "cs_", clientAttributes);
|
||||
checkAttributes(resultMsg, true, "ss_", serverAttributes);
|
||||
checkAttributes(resultMsg, true, "shared_", sharedAttributes);
|
||||
|
||||
//check timeseries
|
||||
checkTs(tsKeys, true, false, null, msgData);
|
||||
checkTs(resultMsg, true, false, tsKeys);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -188,16 +188,34 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
TbMsg msg = getTbMsg(originator);
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
TbMsg resultMsg = checkMsg();
|
||||
JsonNode msgData = JacksonUtil.toJsonNode(resultMsg.getData());
|
||||
// check msg
|
||||
TbMsg resultMsg = checkMsg(true);
|
||||
|
||||
//check attributes
|
||||
checkAttributes(clientAttributes, "cs_", true, null, msgData);
|
||||
checkAttributes(serverAttributes, "ss_", true, null, msgData);
|
||||
checkAttributes(sharedAttributes, "shared_", true, null, msgData);
|
||||
checkAttributes(resultMsg, true, "cs_", clientAttributes);
|
||||
checkAttributes(resultMsg, true, "ss_", serverAttributes);
|
||||
checkAttributes(resultMsg, true, "shared_", sharedAttributes);
|
||||
|
||||
//check timeseries with ts
|
||||
checkTs(tsKeys, true, true, null, msgData);
|
||||
checkTs(resultMsg, true, true, tsKeys);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchToMetadata_whenOnMsg_then_failure() throws Exception {
|
||||
TbGetAttributesNode node = initNode(false, false, true);
|
||||
TbMsg msg = getTbMsg(originator);
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
// check msg
|
||||
TbMsg actualMsg = checkMsg(false);
|
||||
|
||||
//check attributes
|
||||
checkAttributes(actualMsg, false, "cs_", clientAttributes);
|
||||
checkAttributes(actualMsg, false, "ss_", serverAttributes);
|
||||
checkAttributes(actualMsg, false, "shared_", sharedAttributes);
|
||||
|
||||
//check timeseries with ts
|
||||
checkTs(actualMsg, false, false, tsKeys);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -206,33 +224,46 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
TbMsg msg = getTbMsg(originator);
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
|
||||
Mockito.verify(ctx, never()).tellSuccess(any());
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture());
|
||||
// check msg
|
||||
TbMsg actualMsg = checkMsg(false);
|
||||
|
||||
Assert.assertSame(newMsgCaptor.getValue(), msg);
|
||||
Assert.assertNotNull(exceptionCaptor.getValue());
|
||||
//check attributes
|
||||
checkAttributes(actualMsg, true, "cs_", clientAttributes);
|
||||
checkAttributes(actualMsg, true, "ss_", serverAttributes);
|
||||
checkAttributes(actualMsg, true, "shared_", sharedAttributes);
|
||||
|
||||
//check timeseries with ts
|
||||
checkTs(actualMsg, true, true, tsKeys);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fetchToData_whenOnMsg_then_data_not_object_failure() throws Exception {
|
||||
public void fetchToData_whenOnMsg_and_data_is_not_object_then_failure() throws Exception {
|
||||
TbGetAttributesNode node = initNode(true, true, true);
|
||||
TbMsg msg = TbMsg.newMsg("TEST", originator, new TbMsgMetaData(), "[]");
|
||||
node.onMsg(ctx, msg);
|
||||
|
||||
ArgumentCaptor<TbMsg> newMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
|
||||
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(IllegalArgumentException.class);
|
||||
Mockito.verify(ctx, never()).tellSuccess(any());
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(newMsgCaptor.capture(), exceptionCaptor.capture());
|
||||
|
||||
Assert.assertSame(newMsgCaptor.getValue(), msg);
|
||||
Assert.assertSame(msg, newMsgCaptor.getValue());
|
||||
Assert.assertNotNull(exceptionCaptor.getValue());
|
||||
}
|
||||
|
||||
private TbMsg checkMsg() {
|
||||
private TbMsg checkMsg(boolean checkSuccess) {
|
||||
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
if (checkSuccess) {
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellSuccess(msgCaptor.capture());
|
||||
} else {
|
||||
ArgumentCaptor<RuntimeException> exceptionCaptor = ArgumentCaptor.forClass(RuntimeException.class);
|
||||
Mockito.verify(ctx, never()).tellSuccess(any());
|
||||
Mockito.verify(ctx, Mockito.timeout(5000)).tellFailure(msgCaptor.capture(), exceptionCaptor.capture());
|
||||
RuntimeException exception = exceptionCaptor.getValue();
|
||||
Assert.assertNotNull(exception);
|
||||
Assert.assertNotNull(exception.getMessage());
|
||||
Assert.assertTrue(exception.getMessage().startsWith("The following attribute/telemetry keys is not present in the DB:"));
|
||||
}
|
||||
|
||||
TbMsg resultMsg = msgCaptor.getValue();
|
||||
Assert.assertNotNull(resultMsg);
|
||||
@ -241,7 +272,8 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
return resultMsg;
|
||||
}
|
||||
|
||||
private void checkAttributes(List<String> attributes, String prefix, boolean fetchToData, TbMsgMetaData msgMetaData, JsonNode msgData) {
|
||||
private void checkAttributes(TbMsg actualMsg, boolean fetchToData, String prefix, List<String> attributes) {
|
||||
JsonNode msgData = JacksonUtil.toJsonNode(actualMsg.getData());
|
||||
attributes.stream()
|
||||
.filter(attribute -> !attribute.equals("unknown"))
|
||||
.forEach(attribute -> {
|
||||
@ -249,14 +281,15 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
if (fetchToData) {
|
||||
result = msgData.get(prefix + attribute).asText();
|
||||
} else {
|
||||
result = msgMetaData.getValue(prefix + attribute);
|
||||
result = actualMsg.getMetaData().getValue(prefix + attribute);
|
||||
}
|
||||
Assert.assertNotNull(result);
|
||||
Assert.assertEquals(attribute + "_value", result);
|
||||
});
|
||||
}
|
||||
|
||||
private void checkTs(List<String> tsKeys, boolean fetchToData, boolean getLatestValueWithTs, TbMsgMetaData msgMetaData, JsonNode msgData) {
|
||||
private void checkTs(TbMsg actualMsg, boolean fetchToData, boolean getLatestValueWithTs, List<String> tsKeys) {
|
||||
JsonNode msgData = JacksonUtil.toJsonNode(actualMsg.getData());
|
||||
long value = 1L;
|
||||
for (String key : tsKeys) {
|
||||
if (key.equals("unknown")) {
|
||||
@ -272,7 +305,7 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
if (fetchToData) {
|
||||
actualValue = JacksonUtil.toString(msgData.get(key));
|
||||
} else {
|
||||
actualValue = msgMetaData.getValue(key);
|
||||
actualValue = actualMsg.getMetaData().getValue(key);
|
||||
}
|
||||
Assert.assertNotNull(actualValue);
|
||||
Assert.assertEquals(expectedValue, actualValue);
|
||||
@ -303,7 +336,7 @@ public class TbAbstractGetAttributesNodeTest {
|
||||
msgMetaData.putValue("client_attr_metadata", "client_attr_3");
|
||||
msgMetaData.putValue("server_attr_metadata", "server_attr_3");
|
||||
|
||||
return TbMsg.newMsg("TEST", entityId, msgMetaData, msgData.toString());
|
||||
return TbMsg.newMsg("TEST", entityId, msgMetaData, JacksonUtil.toString(msgData));
|
||||
}
|
||||
|
||||
private List<String> getAttributeNames(String prefix) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user