diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java index f95909e813..478e49f004 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetTelemetryNode.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2022 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -110,11 +110,9 @@ public class TbGetTelemetryNode implements TbNode { ctx.tellFailure(msg, new IllegalStateException("Telemetry is not selected!")); } else { try { - if (config.isUseMetadataIntervalPatterns()) { - checkKeyPatterns(msg); - } + Interval interval = getInterval(msg); List keys = TbNodeUtils.processPatterns(tsKeyNames, msg); - ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(msg, keys)); + ListenableFuture> list = ctx.getTimeseriesService().findAll(ctx.getTenantId(), msg.getOriginator(), buildQueries(interval, keys)); DonAsynchron.withCallback(list, data -> { process(data, msg, keys); ctx.tellSuccess(ctx.transformMsg(msg, msg.getType(), msg.getOriginator(), msg.getMetaData(), msg.getData())); @@ -129,8 +127,7 @@ public class TbGetTelemetryNode implements TbNode { public void destroy() { } - private List buildQueries(TbMsg msg, List keys) { - final Interval interval = getInterval(msg); + private List buildQueries(Interval interval, List keys) { final long aggIntervalStep = Aggregation.NONE.equals(aggregation) ? 1 : // exact how it validates on BaseTimeseriesService.validate() // see CassandraBaseTimeseriesDao.findAllAsync() @@ -210,72 +207,45 @@ public class TbGetTelemetryNode implements TbNode { } private Interval getInterval(TbMsg msg) { - Interval interval = new Interval(); if (config.isUseMetadataIntervalPatterns()) { - if (isParsable(msg, config.getStartIntervalPattern())) { - interval.setStartTs(Long.parseLong(TbNodeUtils.processPattern(config.getStartIntervalPattern(), msg))); - } - if (isParsable(msg, config.getEndIntervalPattern())) { - interval.setEndTs(Long.parseLong(TbNodeUtils.processPattern(config.getEndIntervalPattern(), msg))); - } + return getIntervalFromPatterns(msg); } else { + Interval interval = new Interval(); long ts = System.currentTimeMillis(); interval.setStartTs(ts - TimeUnit.valueOf(config.getStartIntervalTimeUnit()).toMillis(config.getStartInterval())); interval.setEndTs(ts - TimeUnit.valueOf(config.getEndIntervalTimeUnit()).toMillis(config.getEndInterval())); + return interval; } + } + + private Interval getIntervalFromPatterns(TbMsg msg) { + Interval interval = new Interval(); + interval.setStartTs(checkPattern(msg, config.getStartIntervalPattern())); + interval.setEndTs(checkPattern(msg, config.getEndIntervalPattern())); return interval; } - private boolean isParsable(TbMsg msg, String pattern) { - return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg)); - } - - private void checkKeyPatterns(TbMsg msg) { - isUndefined(msg, config.getStartIntervalPattern(), config.getEndIntervalPattern()); - isInvalid(msg, config.getStartIntervalPattern(), config.getEndIntervalPattern()); - } - - private void isUndefined(TbMsg msg, String startIntervalPattern, String endIntervalPattern) { - if (getValuePattern(msg, startIntervalPattern) == null && getValuePattern(msg, endIntervalPattern) == null) { - throw new IllegalArgumentException("Message values: '" + - replaceRegex(startIntervalPattern) + "' and '" + - replaceRegex(endIntervalPattern) + "' are undefined"); - } else { - if (getValuePattern(msg, startIntervalPattern) == null) { - throw new IllegalArgumentException("Message value: '" + - replaceRegex(startIntervalPattern) + "' is undefined"); - } - if (getValuePattern(msg, endIntervalPattern) == null) { - throw new IllegalArgumentException("Message value: '" + - replaceRegex(endIntervalPattern) + "' is undefined"); - } + private long checkPattern(TbMsg msg, String pattern) { + String value = getValuePattern(msg, pattern); + if (value == null) { + throw new IllegalArgumentException("Message value: '" + + replaceRegex(pattern) + "' is undefined"); } - } - - private void isInvalid(TbMsg msg, String startIntervalPattern, String endIntervalPattern) { - if (getInterval(msg).getStartTs() == null && getInterval(msg).getEndTs() == null) { - throw new IllegalArgumentException("Message values: '" + - replaceRegex(startIntervalPattern) + "' and '" + - replaceRegex(endIntervalPattern) + "' have invalid format"); - } else { - if (getInterval(msg).getStartTs() == null) { - throw new IllegalArgumentException("Message value: '" + - replaceRegex(startIntervalPattern) + "' has invalid format"); - } - if (getInterval(msg).getEndTs() == null) { - throw new IllegalArgumentException("Message value: '" + - replaceRegex(endIntervalPattern) + "' has invalid format"); - } + boolean parsable = NumberUtils.isParsable(value); + if (!parsable) { + throw new IllegalArgumentException("Message value: '" + + replaceRegex(pattern) + "' has invalid format"); } + return Long.parseLong(value); } private String getValuePattern(TbMsg msg, String pattern) { - String valuePattern = TbNodeUtils.processPattern(pattern, msg); - return valuePattern.equals(pattern) ? null : valuePattern; + String value = TbNodeUtils.processPattern(pattern, msg); + return value.equals(pattern) ? null : value; } private String replaceRegex(String pattern) { - return pattern.replaceAll("[${}]", ""); + return pattern.replaceAll("[$\\[{}\\]]", ""); } private int validateLimit(int limit) {