UI: Refactor data aggregator to support time-series aggregation key processing in latest widgets
This commit is contained in:
parent
8d303be09d
commit
31276ddb42
@ -18,7 +18,6 @@ import { AggKey, IndexedSubscriptionData, } from '@app/shared/models/telemetry/t
|
|||||||
import {
|
import {
|
||||||
AggregationType,
|
AggregationType,
|
||||||
calculateAggIntervalWithSubscriptionTimeWindow,
|
calculateAggIntervalWithSubscriptionTimeWindow,
|
||||||
calculateInterval,
|
|
||||||
calculateIntervalComparisonEndTime,
|
calculateIntervalComparisonEndTime,
|
||||||
calculateIntervalEndTime,
|
calculateIntervalEndTime,
|
||||||
calculateIntervalStartEndTime,
|
calculateIntervalStartEndTime,
|
||||||
@ -28,7 +27,7 @@ import {
|
|||||||
SubscriptionTimewindow
|
SubscriptionTimewindow
|
||||||
} from '@shared/models/time/time.models';
|
} from '@shared/models/time/time.models';
|
||||||
import { UtilsService } from '@core/services/utils.service';
|
import { UtilsService } from '@core/services/utils.service';
|
||||||
import { deepClone, isDefined, isDefinedAndNotNull, isNumber, isNumeric } from '@core/utils';
|
import { deepClone, isDefinedAndNotNull, isNumber, isNumeric } from '@core/utils';
|
||||||
import { DataEntry, DataSet, IndexedData } from '@shared/models/widget.models';
|
import { DataEntry, DataSet, IndexedData } from '@shared/models/widget.models';
|
||||||
import BTree from 'sorted-btree';
|
import BTree from 'sorted-btree';
|
||||||
import Timeout = NodeJS.Timeout;
|
import Timeout = NodeJS.Timeout;
|
||||||
@ -49,7 +48,8 @@ class AggDataMap {
|
|||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private subsTw: SubscriptionTimewindow,
|
private subsTw: SubscriptionTimewindow,
|
||||||
private endTs: number
|
private endTs: number,
|
||||||
|
private aggType: AggregationType
|
||||||
){};
|
){};
|
||||||
|
|
||||||
set(ts: number, data: AggData) {
|
set(ts: number, data: AggData) {
|
||||||
@ -64,9 +64,9 @@ class AggDataMap {
|
|||||||
this.map.delete(ts);
|
this.map.delete(ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
findDataForTs(ts: number, noAggregation: boolean): AggData | undefined {
|
findDataForTs(ts: number): AggData | undefined {
|
||||||
if (ts >= this.endTs) {
|
if (ts >= this.endTs) {
|
||||||
this.updateLastInterval(ts + 1, noAggregation);
|
this.updateLastInterval(ts + 1);
|
||||||
}
|
}
|
||||||
const pair = this.map.getPairOrNextLower(ts, this.reusePair);
|
const pair = this.map.getPairOrNextLower(ts, this.reusePair);
|
||||||
if (pair) {
|
if (pair) {
|
||||||
@ -79,17 +79,16 @@ class AggDataMap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
calculateAggInterval(timestamp: number): [number, number] {
|
calculateAggInterval(timestamp: number): [number, number] {
|
||||||
return calculateInterval(this.subsTw.startTs, this.endTs, this.subsTw.aggregation.interval, this.subsTw.tsOffset, this.subsTw.timezone, timestamp);
|
return calculateAggIntervalWithSubscriptionTimeWindow(this.subsTw, this.endTs, timestamp, this.aggType);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateLastInterval(endTs: number, noAggregation?: boolean) {
|
updateLastInterval(endTs: number) {
|
||||||
if (endTs > this.endTs) {
|
if (endTs > this.endTs) {
|
||||||
this.endTs = endTs;
|
this.endTs = endTs;
|
||||||
const lastTs = this.map.maxKey();
|
const lastTs = this.map.maxKey();
|
||||||
if (lastTs) {
|
if (lastTs) {
|
||||||
const data = this.map.get(lastTs);
|
const data = this.map.get(lastTs);
|
||||||
const interval = isDefined(noAggregation) && !noAggregation ? this.calculateAggInterval(data.ts) :
|
const interval = calculateAggIntervalWithSubscriptionTimeWindow(this.subsTw, endTs, data.ts, this.aggType);
|
||||||
calculateAggIntervalWithSubscriptionTimeWindow(this.subsTw, endTs, data.ts);
|
|
||||||
data.interval = interval;
|
data.interval = interval;
|
||||||
data.ts = interval[0] + Math.floor((interval[1] - interval[0]) / 2);
|
data.ts = interval[0] + Math.floor((interval[1] - interval[0]) / 2);
|
||||||
}
|
}
|
||||||
@ -417,7 +416,7 @@ export class DataAggregator {
|
|||||||
const noAggregation = aggType === AggregationType.NONE;
|
const noAggregation = aggType === AggregationType.NONE;
|
||||||
let aggKeyData = aggregationMap.aggMap[id];
|
let aggKeyData = aggregationMap.aggMap[id];
|
||||||
if (!aggKeyData) {
|
if (!aggKeyData) {
|
||||||
aggKeyData = new AggDataMap(this.subsTw, this.endTs);
|
aggKeyData = new AggDataMap(this.subsTw, this.endTs, aggType);
|
||||||
aggregationMap.aggMap[id] = aggKeyData;
|
aggregationMap.aggMap[id] = aggKeyData;
|
||||||
}
|
}
|
||||||
const keyData = data[id];
|
const keyData = data[id];
|
||||||
@ -451,14 +450,14 @@ export class DataAggregator {
|
|||||||
const noAggregation = aggType === AggregationType.NONE;
|
const noAggregation = aggType === AggregationType.NONE;
|
||||||
let aggKeyData = this.aggregationMap.aggMap[id];
|
let aggKeyData = this.aggregationMap.aggMap[id];
|
||||||
if (!aggKeyData) {
|
if (!aggKeyData) {
|
||||||
aggKeyData = new AggDataMap(this.subsTw, this.endTs);
|
aggKeyData = new AggDataMap(this.subsTw, this.endTs, aggType);
|
||||||
this.aggregationMap.aggMap[id] = aggKeyData;
|
this.aggregationMap.aggMap[id] = aggKeyData;
|
||||||
}
|
}
|
||||||
const keyData = data[id];
|
const keyData = data[id];
|
||||||
keyData.forEach((kvPair) => {
|
keyData.forEach((kvPair) => {
|
||||||
const timestamp = kvPair[0];
|
const timestamp = kvPair[0];
|
||||||
const value = DataAggregator.convertValue(kvPair[1], noAggregation);
|
const value = DataAggregator.convertValue(kvPair[1], noAggregation);
|
||||||
let aggData = aggKeyData.findDataForTs(timestamp, noAggregation);
|
let aggData = aggKeyData.findDataForTs(timestamp);
|
||||||
if (!aggData) {
|
if (!aggData) {
|
||||||
let interval: [number, number] = [timestamp, timestamp];
|
let interval: [number, number] = [timestamp, timestamp];
|
||||||
if (!noAggregation) {
|
if (!noAggregation) {
|
||||||
|
|||||||
@ -1116,8 +1116,8 @@ export const endIntervalDate = (current: moment_.Moment, interval: IntervalType)
|
|||||||
};
|
};
|
||||||
|
|
||||||
export const calculateAggIntervalWithSubscriptionTimeWindow
|
export const calculateAggIntervalWithSubscriptionTimeWindow
|
||||||
= (subsTw: SubscriptionTimewindow, endTs: number, timestamp: number): [number, number] => {
|
= (subsTw: SubscriptionTimewindow, endTs: number, timestamp: number, aggType?: AggregationType): [number, number] => {
|
||||||
if (subsTw.aggregation.type === AggregationType.NONE) {
|
if ((aggType || subsTw.aggregation.type) === AggregationType.NONE) {
|
||||||
return [timestamp, timestamp];
|
return [timestamp, timestamp];
|
||||||
} else {
|
} else {
|
||||||
return calculateInterval(subsTw.startTs, endTs, subsTw.aggregation.interval, subsTw.tsOffset, subsTw.timezone, timestamp);
|
return calculateInterval(subsTw.startTs, endTs, subsTw.aggregation.interval, subsTw.tsOffset, subsTw.timezone, timestamp);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user