From 0f9cecafbd7d5399506108f6227de2d4dad6ad4a Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Fri, 19 Mar 2021 15:56:12 +0200 Subject: [PATCH] Improve data aggregator to send updates on late data --- ui-ngx/src/app/core/api/data-aggregator.ts | 85 ++++++++++++++++++---- 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/ui-ngx/src/app/core/api/data-aggregator.ts b/ui-ngx/src/app/core/api/data-aggregator.ts index 1749d89e4e..2444f11dec 100644 --- a/ui-ngx/src/app/core/api/data-aggregator.ts +++ b/ui-ngx/src/app/core/api/data-aggregator.ts @@ -36,8 +36,56 @@ interface AggData { aggValue: any; } -interface AggregationMap { - [key: string]: Map; +class AggDataMap { + rangeChanged = false; + private minTs = Number.MAX_SAFE_INTEGER; + private map = new Map(); + + set(ts: number, data: AggData) { + if (ts < this.minTs) { + this.rangeChanged = true; + this.minTs = ts; + } + this.map.set(ts, data); + } + + get(ts: number): AggData { + return this.map.get(ts); + } + + delete(ts: number) { + this.map.delete(ts); + } + + forEach(callback: (value: AggData, key: number, map: Map) => void, thisArg?: any) { + this.map.forEach(callback, thisArg); + } + + size(): number { + return this.map.size; + } +} + +class AggregationMap { + aggMap: {[key: string]: AggDataMap} = {}; + + detectRangeChanged(): boolean { + let changed = false; + for (const key of Object.keys(this.aggMap)) { + const aggDataMap = this.aggMap[key]; + if (aggDataMap.rangeChanged) { + changed = true; + aggDataMap.rangeChanged = false; + } + } + return changed; + } + + clearRangeChangedFlags() { + for (const key of Object.keys(this.aggMap)) { + this.aggMap[key].rangeChanged = false; + } + } } declare type AggFunction = (aggData: AggData, value?: any) => void; @@ -170,7 +218,7 @@ export class DataAggregator { updateIntervalScheduledTime = false; } if (update) { - this.aggregationMap = {}; + this.aggregationMap = new AggregationMap(); this.updateAggregatedData(data.data); } else { this.aggregationMap = this.processAggregatedData(data.data); @@ -178,12 +226,17 @@ export class DataAggregator { if (updateIntervalScheduledTime) { this.intervalScheduledTime = this.utils.currentPerfTime(); } + this.aggregationMap.clearRangeChangedFlags(); this.onInterval(history, detectChanges); } else { this.updateAggregatedData(data.data); if (history) { this.intervalScheduledTime = this.utils.currentPerfTime(); this.onInterval(history, detectChanges); + } else { + if (this.aggregationMap.detectRangeChanged()) { + this.onInterval(false, detectChanges, true); + } } } } @@ -203,7 +256,7 @@ export class DataAggregator { } } - private onInterval(history?: boolean, detectChanges?: boolean) { + private onInterval(history?: boolean, detectChanges?: boolean, rangeChanged?: boolean) { const now = this.utils.currentPerfTime(); this.elapsed += now - this.intervalScheduledTime; this.intervalScheduledTime = now; @@ -211,9 +264,10 @@ export class DataAggregator { clearTimeout(this.intervalTimeoutHandle); this.intervalTimeoutHandle = null; } + const intervalTimeout = rangeChanged ? this.aggregationTimeout - this.elapsed : this.aggregationTimeout; if (!history) { const delta = Math.floor(this.elapsed / this.subsTw.aggregation.interval); - if (delta || !this.data) { + if (delta || !this.data || rangeChanged) { const tickTs = delta * this.subsTw.aggregation.interval; if (this.subsTw.quickInterval) { const currentDate = this.getCurrentTime(); @@ -234,7 +288,7 @@ export class DataAggregator { this.updatedData = false; } if (!history) { - this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), this.aggregationTimeout); + this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), intervalTimeout); } } @@ -242,8 +296,8 @@ export class DataAggregator { this.tsKeyNames.forEach((key) => { this.dataBuffer[key] = []; }); - for (const key of Object.keys(this.aggregationMap)) { - const aggKeyData = this.aggregationMap[key]; + for (const key of Object.keys(this.aggregationMap.aggMap)) { + const aggKeyData = this.aggregationMap.aggMap[key]; let keyData = this.dataBuffer[key]; aggKeyData.forEach((aggData, aggTimestamp) => { if (aggTimestamp <= this.startTs) { @@ -300,12 +354,12 @@ export class DataAggregator { private processAggregatedData(data: SubscriptionData): AggregationMap { const isCount = this.subsTw.aggregation.type === AggregationType.COUNT; - const aggregationMap: AggregationMap = {}; + const aggregationMap = new AggregationMap(); for (const key of Object.keys(data)) { - let aggKeyData = aggregationMap[key]; + let aggKeyData = aggregationMap.aggMap[key]; if (!aggKeyData) { - aggKeyData = new Map(); - aggregationMap[key] = aggKeyData; + aggKeyData = new AggDataMap(); + aggregationMap.aggMap[key] = aggKeyData; } const keyData = data[key]; keyData.forEach((kvPair) => { @@ -326,10 +380,10 @@ export class DataAggregator { private updateAggregatedData(data: SubscriptionData) { const isCount = this.subsTw.aggregation.type === AggregationType.COUNT; for (const key of Object.keys(data)) { - let aggKeyData = this.aggregationMap[key]; + let aggKeyData = this.aggregationMap.aggMap[key]; if (!aggKeyData) { - aggKeyData = new Map(); - this.aggregationMap[key] = aggKeyData; + aggKeyData = new AggDataMap(); + this.aggregationMap.aggMap[key] = aggKeyData; } const keyData = data[key]; keyData.forEach((kvPair) => { @@ -374,4 +428,3 @@ export class DataAggregator { } } -