refactoring TbGetTelemetryNode
This commit is contained in:
parent
b33b682cc6
commit
e195d06cb6
@ -70,6 +70,7 @@ public class TbGetTelemetryNode implements TbNode {
|
|||||||
private int limit;
|
private int limit;
|
||||||
private ObjectMapper mapper;
|
private ObjectMapper mapper;
|
||||||
private String fetchMode;
|
private String fetchMode;
|
||||||
|
private String orderBy;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
@ -77,6 +78,7 @@ public class TbGetTelemetryNode implements TbNode {
|
|||||||
tsKeyNames = config.getLatestTsKeyNames();
|
tsKeyNames = config.getLatestTsKeyNames();
|
||||||
limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1;
|
limit = config.getFetchMode().equals(FETCH_MODE_ALL) ? MAX_FETCH_SIZE : 1;
|
||||||
fetchMode = config.getFetchMode();
|
fetchMode = config.getFetchMode();
|
||||||
|
orderBy = config.getOrderBy();
|
||||||
mapper = new ObjectMapper();
|
mapper = new ObjectMapper();
|
||||||
mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
|
mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
|
||||||
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
|
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
|
||||||
@ -104,17 +106,9 @@ public class TbGetTelemetryNode implements TbNode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() { }
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<ReadTsKvQuery> buildQueries(TbMsg msg) {
|
private List<ReadTsKvQuery> buildQueries(TbMsg msg) {
|
||||||
String orderBy;
|
|
||||||
if (fetchMode.equals(FETCH_MODE_FIRST) || fetchMode.equals(FETCH_MODE_ALL)) {
|
|
||||||
orderBy = "ASC";
|
|
||||||
} else {
|
|
||||||
orderBy = "DESC";
|
|
||||||
}
|
|
||||||
return tsKeyNames.stream()
|
return tsKeyNames.stream()
|
||||||
.map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, orderBy))
|
.map(key -> new BaseReadTsKvQuery(key, getInterval(msg).getStartTs(), getInterval(msg).getEndTs(), 1, limit, NONE, orderBy))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|||||||
@ -44,11 +44,10 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
|
|||||||
private String startIntervalTimeUnit;
|
private String startIntervalTimeUnit;
|
||||||
private String endIntervalTimeUnit;
|
private String endIntervalTimeUnit;
|
||||||
private String fetchMode; //FIRST, LAST, LATEST
|
private String fetchMode; //FIRST, LAST, LATEST
|
||||||
|
private String orderBy; //ASC, DESC,
|
||||||
|
|
||||||
private List<String> latestTsKeyNames;
|
private List<String> latestTsKeyNames;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbGetTelemetryNodeConfiguration defaultConfiguration() {
|
public TbGetTelemetryNodeConfiguration defaultConfiguration() {
|
||||||
TbGetTelemetryNodeConfiguration configuration = new TbGetTelemetryNodeConfiguration();
|
TbGetTelemetryNodeConfiguration configuration = new TbGetTelemetryNodeConfiguration();
|
||||||
@ -61,6 +60,7 @@ public class TbGetTelemetryNodeConfiguration implements NodeConfiguration<TbGetT
|
|||||||
configuration.setUseMetadataIntervalPatterns(false);
|
configuration.setUseMetadataIntervalPatterns(false);
|
||||||
configuration.setStartIntervalPattern("");
|
configuration.setStartIntervalPattern("");
|
||||||
configuration.setEndIntervalPattern("");
|
configuration.setEndIntervalPattern("");
|
||||||
|
configuration.setOrderBy("ASC");
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user