Improve data aggregator to send updates on late data
This commit is contained in:
parent
fbb3d85e08
commit
0f9cecafbd
@ -36,8 +36,56 @@ interface AggData {
|
||||
aggValue: any;
|
||||
}
|
||||
|
||||
interface AggregationMap {
|
||||
[key: string]: Map<number, AggData>;
|
||||
class AggDataMap {
|
||||
rangeChanged = false;
|
||||
private minTs = Number.MAX_SAFE_INTEGER;
|
||||
private map = new Map<number, AggData>();
|
||||
|
||||
set(ts: number, data: AggData) {
|
||||
if (ts < this.minTs) {
|
||||
this.rangeChanged = true;
|
||||
this.minTs = ts;
|
||||
}
|
||||
this.map.set(ts, data);
|
||||
}
|
||||
|
||||
get(ts: number): AggData {
|
||||
return this.map.get(ts);
|
||||
}
|
||||
|
||||
delete(ts: number) {
|
||||
this.map.delete(ts);
|
||||
}
|
||||
|
||||
forEach(callback: (value: AggData, key: number, map: Map<number, AggData>) => void, thisArg?: any) {
|
||||
this.map.forEach(callback, thisArg);
|
||||
}
|
||||
|
||||
size(): number {
|
||||
return this.map.size;
|
||||
}
|
||||
}
|
||||
|
||||
class AggregationMap {
|
||||
aggMap: {[key: string]: AggDataMap} = {};
|
||||
|
||||
detectRangeChanged(): boolean {
|
||||
let changed = false;
|
||||
for (const key of Object.keys(this.aggMap)) {
|
||||
const aggDataMap = this.aggMap[key];
|
||||
if (aggDataMap.rangeChanged) {
|
||||
changed = true;
|
||||
aggDataMap.rangeChanged = false;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
clearRangeChangedFlags() {
|
||||
for (const key of Object.keys(this.aggMap)) {
|
||||
this.aggMap[key].rangeChanged = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
declare type AggFunction = (aggData: AggData, value?: any) => void;
|
||||
@ -170,7 +218,7 @@ export class DataAggregator {
|
||||
updateIntervalScheduledTime = false;
|
||||
}
|
||||
if (update) {
|
||||
this.aggregationMap = {};
|
||||
this.aggregationMap = new AggregationMap();
|
||||
this.updateAggregatedData(data.data);
|
||||
} else {
|
||||
this.aggregationMap = this.processAggregatedData(data.data);
|
||||
@ -178,12 +226,17 @@ export class DataAggregator {
|
||||
if (updateIntervalScheduledTime) {
|
||||
this.intervalScheduledTime = this.utils.currentPerfTime();
|
||||
}
|
||||
this.aggregationMap.clearRangeChangedFlags();
|
||||
this.onInterval(history, detectChanges);
|
||||
} else {
|
||||
this.updateAggregatedData(data.data);
|
||||
if (history) {
|
||||
this.intervalScheduledTime = this.utils.currentPerfTime();
|
||||
this.onInterval(history, detectChanges);
|
||||
} else {
|
||||
if (this.aggregationMap.detectRangeChanged()) {
|
||||
this.onInterval(false, detectChanges, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -203,7 +256,7 @@ export class DataAggregator {
|
||||
}
|
||||
}
|
||||
|
||||
private onInterval(history?: boolean, detectChanges?: boolean) {
|
||||
private onInterval(history?: boolean, detectChanges?: boolean, rangeChanged?: boolean) {
|
||||
const now = this.utils.currentPerfTime();
|
||||
this.elapsed += now - this.intervalScheduledTime;
|
||||
this.intervalScheduledTime = now;
|
||||
@ -211,9 +264,10 @@ export class DataAggregator {
|
||||
clearTimeout(this.intervalTimeoutHandle);
|
||||
this.intervalTimeoutHandle = null;
|
||||
}
|
||||
const intervalTimeout = rangeChanged ? this.aggregationTimeout - this.elapsed : this.aggregationTimeout;
|
||||
if (!history) {
|
||||
const delta = Math.floor(this.elapsed / this.subsTw.aggregation.interval);
|
||||
if (delta || !this.data) {
|
||||
if (delta || !this.data || rangeChanged) {
|
||||
const tickTs = delta * this.subsTw.aggregation.interval;
|
||||
if (this.subsTw.quickInterval) {
|
||||
const currentDate = this.getCurrentTime();
|
||||
@ -234,7 +288,7 @@ export class DataAggregator {
|
||||
this.updatedData = false;
|
||||
}
|
||||
if (!history) {
|
||||
this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), this.aggregationTimeout);
|
||||
this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), intervalTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
@ -242,8 +296,8 @@ export class DataAggregator {
|
||||
this.tsKeyNames.forEach((key) => {
|
||||
this.dataBuffer[key] = [];
|
||||
});
|
||||
for (const key of Object.keys(this.aggregationMap)) {
|
||||
const aggKeyData = this.aggregationMap[key];
|
||||
for (const key of Object.keys(this.aggregationMap.aggMap)) {
|
||||
const aggKeyData = this.aggregationMap.aggMap[key];
|
||||
let keyData = this.dataBuffer[key];
|
||||
aggKeyData.forEach((aggData, aggTimestamp) => {
|
||||
if (aggTimestamp <= this.startTs) {
|
||||
@ -300,12 +354,12 @@ export class DataAggregator {
|
||||
|
||||
private processAggregatedData(data: SubscriptionData): AggregationMap {
|
||||
const isCount = this.subsTw.aggregation.type === AggregationType.COUNT;
|
||||
const aggregationMap: AggregationMap = {};
|
||||
const aggregationMap = new AggregationMap();
|
||||
for (const key of Object.keys(data)) {
|
||||
let aggKeyData = aggregationMap[key];
|
||||
let aggKeyData = aggregationMap.aggMap[key];
|
||||
if (!aggKeyData) {
|
||||
aggKeyData = new Map<number, AggData>();
|
||||
aggregationMap[key] = aggKeyData;
|
||||
aggKeyData = new AggDataMap();
|
||||
aggregationMap.aggMap[key] = aggKeyData;
|
||||
}
|
||||
const keyData = data[key];
|
||||
keyData.forEach((kvPair) => {
|
||||
@ -326,10 +380,10 @@ export class DataAggregator {
|
||||
private updateAggregatedData(data: SubscriptionData) {
|
||||
const isCount = this.subsTw.aggregation.type === AggregationType.COUNT;
|
||||
for (const key of Object.keys(data)) {
|
||||
let aggKeyData = this.aggregationMap[key];
|
||||
let aggKeyData = this.aggregationMap.aggMap[key];
|
||||
if (!aggKeyData) {
|
||||
aggKeyData = new Map<number, AggData>();
|
||||
this.aggregationMap[key] = aggKeyData;
|
||||
aggKeyData = new AggDataMap();
|
||||
this.aggregationMap.aggMap[key] = aggKeyData;
|
||||
}
|
||||
const keyData = data[key];
|
||||
keyData.forEach((kvPair) => {
|
||||
@ -374,4 +428,3 @@ export class DataAggregator {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user