Merge pull request #11878 from irynamatveieva/hotfix/originator-telemetry-node
Fixed originator telemetry node
This commit is contained in:
commit
32c9dca31d
@ -52,7 +52,7 @@ import java.util.stream.Collectors;
|
|||||||
@RuleNode(type = ComponentType.ENRICHMENT,
|
@RuleNode(type = ComponentType.ENRICHMENT,
|
||||||
name = "originator telemetry",
|
name = "originator telemetry",
|
||||||
configClazz = TbGetTelemetryNodeConfiguration.class,
|
configClazz = TbGetTelemetryNodeConfiguration.class,
|
||||||
version = 1,
|
version = 2,
|
||||||
nodeDescription = "Adds message originator telemetry for selected time range into message metadata",
|
nodeDescription = "Adds message originator telemetry for selected time range into message metadata",
|
||||||
nodeDetails = "Useful when you need to get telemetry data set from the message originator for a specific time range " +
|
nodeDetails = "Useful when you need to get telemetry data set from the message originator for a specific time range " +
|
||||||
"instead of fetching just the latest telemetry or if you need to get the closest telemetry to the fetch interval start or end. " +
|
"instead of fetching just the latest telemetry or if you need to get the closest telemetry to the fetch interval start or end. " +
|
||||||
@ -232,21 +232,21 @@ public class TbGetTelemetryNode implements TbNode {
|
|||||||
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
|
||||||
boolean hasChanges = false;
|
boolean hasChanges = false;
|
||||||
switch (fromVersion) {
|
switch (fromVersion) {
|
||||||
case 0 -> {
|
case 0: {
|
||||||
if (oldConfiguration.hasNonNull("fetchMode")) {
|
if (oldConfiguration.hasNonNull("fetchMode")) {
|
||||||
String fetchMode = oldConfiguration.get("fetchMode").asText();
|
String fetchMode = oldConfiguration.get("fetchMode").asText();
|
||||||
switch (fetchMode) {
|
switch (fetchMode) {
|
||||||
case "FIRST":
|
case "FIRST" -> {
|
||||||
((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
|
((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
|
||||||
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
break;
|
}
|
||||||
case "LAST":
|
case "LAST" -> {
|
||||||
((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name());
|
((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name());
|
||||||
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
break;
|
}
|
||||||
case "ALL":
|
case "ALL" -> {
|
||||||
if (oldConfiguration.has("orderBy") &&
|
if (oldConfiguration.has("orderBy") &&
|
||||||
(oldConfiguration.get("orderBy").isNull() || oldConfiguration.get("orderBy").asText().isEmpty())) {
|
(oldConfiguration.get("orderBy").isNull() || oldConfiguration.get("orderBy").asText().isEmpty())) {
|
||||||
((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
|
((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
|
||||||
@ -257,16 +257,33 @@ public class TbGetTelemetryNode implements TbNode {
|
|||||||
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
}
|
}
|
||||||
break;
|
}
|
||||||
default:
|
default -> {
|
||||||
((ObjectNode) oldConfiguration).put("fetchMode", FetchMode.LAST.name());
|
((ObjectNode) oldConfiguration).put("fetchMode", FetchMode.LAST.name());
|
||||||
((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name());
|
((ObjectNode) oldConfiguration).put("orderBy", Direction.DESC.name());
|
||||||
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
||||||
hasChanges = true;
|
hasChanges = true;
|
||||||
break;
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case 1: {
|
||||||
|
if (!oldConfiguration.hasNonNull("limit")) {
|
||||||
|
((ObjectNode) oldConfiguration).put("limit", 1000);
|
||||||
|
hasChanges = true;
|
||||||
|
}
|
||||||
|
if (oldConfiguration.has("fetchMode") && oldConfiguration.get("fetchMode").asText().equals("ALL")) {
|
||||||
|
if (!oldConfiguration.hasNonNull("aggregation")) {
|
||||||
|
((ObjectNode) oldConfiguration).put("aggregation", Aggregation.NONE.name());
|
||||||
|
hasChanges = true;
|
||||||
|
}
|
||||||
|
if (!oldConfiguration.hasNonNull("orderBy")) {
|
||||||
|
((ObjectNode) oldConfiguration).put("orderBy", Direction.ASC.name());
|
||||||
|
hasChanges = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return new TbPair<>(hasChanges, oldConfiguration);
|
return new TbPair<>(hasChanges, oldConfiguration);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -637,6 +637,38 @@ public class TbGetTelemetryNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
"endInterval": 1,
|
"endInterval": 1,
|
||||||
"endIntervalTimeUnit": "MINUTES"
|
"endIntervalTimeUnit": "MINUTES"
|
||||||
}
|
}
|
||||||
|
"""),
|
||||||
|
// config for version 0 (fetchMode is 'ALL' and limit, aggregation and orderBy do not exist)
|
||||||
|
Arguments.of(0,
|
||||||
|
"""
|
||||||
|
{
|
||||||
|
"latestTsKeyNames": ["key"],
|
||||||
|
"fetchMode": "ALL",
|
||||||
|
"useMetadataIntervalPatterns": false,
|
||||||
|
"startIntervalPattern": "",
|
||||||
|
"endIntervalPattern": "",
|
||||||
|
"startInterval": 2,
|
||||||
|
"startIntervalTimeUnit": "MINUTES",
|
||||||
|
"endInterval": 1,
|
||||||
|
"endIntervalTimeUnit": "MINUTES"
|
||||||
|
}
|
||||||
|
""",
|
||||||
|
true,
|
||||||
|
"""
|
||||||
|
{
|
||||||
|
"latestTsKeyNames": ["key"],
|
||||||
|
"aggregation": "NONE",
|
||||||
|
"fetchMode": "ALL",
|
||||||
|
"orderBy": "ASC",
|
||||||
|
"limit": 1000,
|
||||||
|
"useMetadataIntervalPatterns": false,
|
||||||
|
"startIntervalPattern": "",
|
||||||
|
"endIntervalPattern": "",
|
||||||
|
"startInterval": 2,
|
||||||
|
"startIntervalTimeUnit": "MINUTES",
|
||||||
|
"endInterval": 1,
|
||||||
|
"endIntervalTimeUnit": "MINUTES"
|
||||||
|
}
|
||||||
""")
|
""")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user