refactor getInterval logic

This commit is contained in:
ShvaykaD 2022-09-16 16:11:25 +03:00
parent 189c1afb01
commit d8135f0782

View File

@ -1,12 +1,12 @@
/** /**
* Copyright © 2016-2022 The Thingsboard Authors * Copyright © 2016-2022 The Thingsboard Authors
* * <p>
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -110,11 +110,9 @@ public class TbGetTelemetryNode implements TbNode {
ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!"));
} else { } else {
try { try {
if (config.isUseMetadataIntervalPatterns()) { Interval interval = getInterval(msg);
checkKeyPatterns(msg);
}
List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg); List<String> keys = TbNodeUtils.processPatterns(tsKeyNames, msg);
ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys)); ListenableFuture<List<TsKvEntry>> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys));
DonAsynchron.withCallback(list, data -> { DonAsynchron.withCallback(list, data -> {
process(data, msg, keys); process(data, msg, keys);
ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData())); ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData()));
@ -129,8 +127,7 @@ public class TbGetTelemetryNode implements TbNode {
public void destroy() { public void destroy() {
} }
private List<ReadTsKvQuery> buildQueries(TbMsg msg, List<String> keys) { private List<ReadTsKvQuery> buildQueries(Interval interval, List<String> keys) {
final Interval interval = getInterval(msg);
final long aggIntervalStep = Aggregation.NONE.equals(aggregation) ? 1 : final long aggIntervalStep = Aggregation.NONE.equals(aggregation) ? 1 :
// exact how it validates on BaseTimeseriesService.validate() // exact how it validates on BaseTimeseriesService.validate()
// see CassandraBaseTimeseriesDao.findAllAsync() // see CassandraBaseTimeseriesDao.findAllAsync()
@ -210,72 +207,45 @@ public class TbGetTelemetryNode implements TbNode {
} }
private Interval getInterval(TbMsg msg) { private Interval getInterval(TbMsg msg) {
Interval interval = new Interval();
if (config.isUseMetadataIntervalPatterns()) { if (config.isUseMetadataIntervalPatterns()) {
if (isParsable(msg, config.getStartIntervalPattern())) { return getIntervalFromPatterns(msg);
interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg)));
}
if (isParsable(msg, config.getEndIntervalPattern())) {
interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg)));
}
} else { } else {
Interval interval = new Interval();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval())); interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval()));
interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval())); interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval()));
return interval;
} }
}
private Interval getIntervalFromPatterns(TbMsg msg) {
Interval interval = new Interval();
interval.setStartTs(checkPattern(msg, config.getStartIntervalPattern()));
interval.setEndTs(checkPattern(msg, config.getEndIntervalPattern()));
return interval; return interval;
} }
private boolean isParsable(TbMsg msg, String pattern) { private long checkPattern(TbMsg msg, String pattern) {
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg)); String value = getValuePattern(msg, pattern);
} if (value == null) {
throw new IllegalArgumentException("Message value: '" +
private void checkKeyPatterns(TbMsg msg) { replaceRegex(pattern) + "' is undefined");
isUndefined(msg, config.getStartIntervalPattern(), config.getEndIntervalPattern());
isInvalid(msg, config.getStartIntervalPattern(), config.getEndIntervalPattern());
}
private void isUndefined(TbMsg msg, String startIntervalPattern, String endIntervalPattern) {
if (getValuePattern(msg, startIntervalPattern) == null && getValuePattern(msg, endIntervalPattern) == null) {
throw new IllegalArgumentException("Message values: '" +
replaceRegex(startIntervalPattern) + "' and '" +
replaceRegex(endIntervalPattern) + "' are undefined");
} else {
if (getValuePattern(msg, startIntervalPattern) == null) {
throw new IllegalArgumentException("Message value: '" +
replaceRegex(startIntervalPattern) + "' is undefined");
}
if (getValuePattern(msg, endIntervalPattern) == null) {
throw new IllegalArgumentException("Message value: '" +
replaceRegex(endIntervalPattern) + "' is undefined");
}
} }
} boolean parsable = NumberUtils.isParsable(value);
if (!parsable) {
private void isInvalid(TbMsg msg, String startIntervalPattern, String endIntervalPattern) { throw new IllegalArgumentException("Message value: '" +
if (getInterval(msg).getStartTs() == null && getInterval(msg).getEndTs() == null) { replaceRegex(pattern) + "' has invalid format");
throw new IllegalArgumentException("Message values: '" +
replaceRegex(startIntervalPattern) + "' and '" +
replaceRegex(endIntervalPattern) + "' have invalid format");
} else {
if (getInterval(msg).getStartTs() == null) {
throw new IllegalArgumentException("Message value: '" +
replaceRegex(startIntervalPattern) + "' has invalid format");
}
if (getInterval(msg).getEndTs() == null) {
throw new IllegalArgumentException("Message value: '" +
replaceRegex(endIntervalPattern) + "' has invalid format");
}
} }
return Long.parseLong(value);
} }
private String getValuePattern(TbMsg msg, String pattern) { private String getValuePattern(TbMsg msg, String pattern) {
String valuePattern = TbNodeUtils.processPattern(pattern, msg); String value = TbNodeUtils.processPattern(pattern, msg);
return valuePattern.equals(pattern) ? null : valuePattern; return value.equals(pattern) ? null : value;
} }
private String replaceRegex(String pattern) { private String replaceRegex(String pattern) {
return pattern.replaceAll("[${}]", ""); return pattern.replaceAll("[$\\[{}\\]]", "");
} }
private int validateLimit(int limit) { private int validateLimit(int limit) {