From c6c40f50a8c1fdba6f50e471aa056af168c1ecb9 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Mon, 12 Sep 2022 17:26:49 +0300 Subject: [PATCH] UI: Aggregation improvements --- ui-ngx/src/app/core/api/data-aggregator.ts | 146 ++++++++++-------- .../app/core/api/entity-data-subscription.ts | 70 ++++----- 2 files changed, 114 insertions(+), 102 deletions(-) diff --git a/ui-ngx/src/app/core/api/data-aggregator.ts b/ui-ngx/src/app/core/api/data-aggregator.ts index 2f86b4a4ce..a17f59dcb0 100644 --- a/ui-ngx/src/app/core/api/data-aggregator.ts +++ b/ui-ngx/src/app/core/api/data-aggregator.ts @@ -67,23 +67,30 @@ class AggDataMap { } class AggregationMap { - aggMap: {[aggKey: string]: AggDataMap} = {}; + aggMap: {[aggType: string]: {[key: string]: AggDataMap}} = {}; detectRangeChanged(): boolean { let changed = false; - for (const aggKey of Object.keys(this.aggMap)) { - const aggDataMap = this.aggMap[aggKey]; - if (aggDataMap.rangeChanged) { - changed = true; - aggDataMap.rangeChanged = false; + for (const aggType of Object.keys(this.aggMap)) { + const aggKeyMap = this.aggMap[aggType]; + for (const key of Object.keys(aggKeyMap)) { + const aggDataMap = aggKeyMap[key]; + if (aggDataMap.rangeChanged) { + changed = true; + aggDataMap.rangeChanged = false; + } } } return changed; } clearRangeChangedFlags() { - for (const aggKey of Object.keys(this.aggMap)) { - this.aggMap[aggKey].rangeChanged = false; + for (const aggType of Object.keys(this.aggMap)) { + const aggKeyMap = this.aggMap[aggType]; + for (const key of Object.keys(aggKeyMap)) { + const aggDataMap = aggKeyMap[key]; + aggDataMap.rangeChanged = false; + } } } } @@ -202,13 +209,6 @@ export class DataAggregator { return `${aggKey.key}_${aggKey.agg}`; } - private static aggKeyFromString(aggKeyString: string): AggKey { - const separatorIndex = aggKeyString.lastIndexOf('_'); - const key = aggKeyString.substring(0, separatorIndex); - const agg = AggregationType[aggKeyString.substring(separatorIndex + 1)]; - return { key, agg }; - } - public updateOnDataCb(newOnDataCb: onAggregatedData): onAggregatedData { const prevOnDataCb = this.onDataCb; this.onDataCb = newOnDataCb; @@ -252,11 +252,11 @@ export class DataAggregator { this.resetPending = false; updateIntervalScheduledTime = false; } + this.aggregationMap = new AggregationMap(); if (update) { - this.aggregationMap = new AggregationMap(); this.updateAggregatedData(aggType, data.data); } else { - this.aggregationMap = this.processAggregatedData(aggType, data.data); + this.processAggregatedData(aggType, data.data); } if (updateIntervalScheduledTime) { this.intervalScheduledTime = this.utils.currentPerfTime(); @@ -264,7 +264,11 @@ export class DataAggregator { this.aggregationMap.clearRangeChangedFlags(); this.onInterval(history, detectChanges); } else { - this.updateAggregatedData(aggType, data.data); + if (this.aggregationMap.aggMap[aggType]) { + this.updateAggregatedData(aggType, data.data); + } else { + this.processAggregatedData(aggType, data.data); + } if (history) { this.intervalScheduledTime = this.utils.currentPerfTime(); this.onInterval(history, detectChanges); @@ -338,49 +342,51 @@ export class DataAggregator { } this.dataBuffer[key.agg][key.key] = []; }); - for (const aggKeyString of Object.keys(this.aggregationMap.aggMap)) { - const aggKeyData = this.aggregationMap.aggMap[aggKeyString]; - const aggKey = DataAggregator.aggKeyFromString(aggKeyString); - const noAggregation = aggKey.agg === AggregationType.NONE; - let keyData = this.dataBuffer[aggKey.agg][aggKey.key]; - aggKeyData.forEach((aggData, aggTimestamp) => { - if (aggTimestamp < this.startTs) { - if (this.subsTw.aggregation.stateData && - (!this.lastPrevKvPairData[aggKeyString] || this.lastPrevKvPairData[aggKeyString][0] < aggTimestamp)) { - this.lastPrevKvPairData[aggKeyString] = [aggTimestamp, aggData.aggValue]; + for (const aggType of Object.keys(this.aggregationMap.aggMap)) { + for (const key of Object.keys(this.aggregationMap.aggMap[aggType])) { + const aggKeyData = this.aggregationMap.aggMap[aggType][key]; + const noAggregation = AggregationType[aggType] === AggregationType.NONE; + const aggKeyString = DataAggregator.aggKeyToString({agg: AggregationType[aggType], key}); + let keyData = this.dataBuffer[aggType][key]; + aggKeyData.forEach((aggData, aggTimestamp) => { + if (aggTimestamp < this.startTs) { + if (this.subsTw.aggregation.stateData && + (!this.lastPrevKvPairData[aggKeyString] || this.lastPrevKvPairData[aggKeyString][0] < aggTimestamp)) { + this.lastPrevKvPairData[aggKeyString] = [aggTimestamp, aggData.aggValue]; + } + aggKeyData.delete(aggTimestamp); + this.updatedData = true; + } else if (aggTimestamp < this.endTs || noAggregation) { + const kvPair: [number, any] = [aggTimestamp, aggData.aggValue]; + keyData.push(kvPair); } - aggKeyData.delete(aggTimestamp); - this.updatedData = true; - } else if (aggTimestamp < this.endTs || noAggregation) { - const kvPair: [number, any] = [aggTimestamp, aggData.aggValue]; - keyData.push(kvPair); - } - }); - keyData.sort((set1, set2) => set1[0] - set2[0]); - if (this.isFloatingLatestDataAgg) { - if (keyData.length) { - const timestamp = this.startTs + (this.endTs - this.startTs) / 2; - const first = keyData[0]; - const aggData: AggData = { - aggValue: aggKey.agg === AggregationType.COUNT ? 1 : first[1], - sum: first[1], - count: 1 - }; - const aggFunction = DataAggregator.getAggFunction(aggKey.agg); - for (let i = 1; i < keyData.length; i++) { - const kvPair = keyData[i]; - aggFunction(aggData, kvPair[1]); + }); + keyData.sort((set1, set2) => set1[0] - set2[0]); + if (this.isFloatingLatestDataAgg) { + if (keyData.length) { + const timestamp = this.startTs + (this.endTs - this.startTs) / 2; + const first = keyData[0]; + const aggData: AggData = { + aggValue: AggregationType[aggType] === AggregationType.COUNT ? 1 : first[1], + sum: first[1], + count: 1 + }; + const aggFunction = DataAggregator.getAggFunction(AggregationType[aggType]); + for (let i = 1; i < keyData.length; i++) { + const kvPair = keyData[i]; + aggFunction(aggData, kvPair[1]); + } + this.dataBuffer[aggType][key] = [[timestamp, aggData.aggValue]]; } - this.dataBuffer[aggKey.agg][aggKey.key] = [[timestamp, aggData.aggValue]]; + } else { + if (this.subsTw.aggregation.stateData) { + this.updateStateBounds(keyData, deepClone(this.lastPrevKvPairData[aggKeyString])); + } + if (keyData.length > this.subsTw.aggregation.limit) { + keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit); + } + this.dataBuffer[aggType][key] = keyData; } - } else { - if (this.subsTw.aggregation.stateData) { - this.updateStateBounds(keyData, deepClone(this.lastPrevKvPairData[aggKeyString])); - } - if (keyData.length > this.subsTw.aggregation.limit) { - keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit); - } - this.dataBuffer[aggKey.agg][aggKey.key] = keyData; } } return this.dataBuffer; @@ -414,16 +420,19 @@ export class DataAggregator { } } - private processAggregatedData(aggType: AggregationType, data: SubscriptionData): AggregationMap { + private processAggregatedData(aggType: AggregationType, data: SubscriptionData) { const isCount = aggType === AggregationType.COUNT; const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg; - const aggregationMap = new AggregationMap(); + let aggDataByKey = this.aggregationMap.aggMap[aggType]; + if (!aggDataByKey) { + aggDataByKey = {}; + this.aggregationMap.aggMap[aggType] = aggDataByKey; + } for (const key of Object.keys(data)) { - const aggKey = DataAggregator.aggKeyToString({key, agg: aggType}); - let aggKeyData = aggregationMap.aggMap[aggKey]; + let aggKeyData = aggDataByKey[key]; if (!aggKeyData) { aggKeyData = new AggDataMap(); - aggregationMap.aggMap[aggKey] = aggKeyData; + aggDataByKey[key] = aggKeyData; } const keyData = data[key]; keyData.forEach((kvPair) => { @@ -438,18 +447,21 @@ export class DataAggregator { aggKeyData.set(tsKey, aggData); }); } - return aggregationMap; } private updateAggregatedData(aggType: AggregationType, data: SubscriptionData) { const isCount = aggType === AggregationType.COUNT; const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg; + let aggDataByKey = this.aggregationMap.aggMap[aggType]; + if (!aggDataByKey) { + aggDataByKey = {}; + this.aggregationMap.aggMap[aggType] = aggDataByKey; + } for (const key of Object.keys(data)) { - const aggKey = DataAggregator.aggKeyToString({key, agg: aggType}); - let aggKeyData = this.aggregationMap.aggMap[aggKey]; + let aggKeyData = aggDataByKey[key]; if (!aggKeyData) { aggKeyData = new AggDataMap(); - this.aggregationMap.aggMap[aggKey] = aggKeyData; + aggDataByKey[key] = aggKeyData; } const keyData = data[key]; keyData.forEach((kvPair) => { diff --git a/ui-ngx/src/app/core/api/entity-data-subscription.ts b/ui-ngx/src/app/core/api/entity-data-subscription.ts index cb02acbe44..3378ee8bd2 100644 --- a/ui-ngx/src/app/core/api/entity-data-subscription.ts +++ b/ui-ngx/src/app/core/api/entity-data-subscription.ts @@ -606,7 +606,7 @@ export class EntityDataSubscription { if (this.aggTsValues && this.aggTsValues.length) { const aggLatestTimewindow = deepClone(this.subsTw); aggLatestTimewindow.aggregation.stateData = false; - const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval; + const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval && !this.history; if (!isFloatingLatestDataAgg) { aggLatestTimewindow.aggregation.interval = aggLatestTimewindow.aggregation.timeWindow; } @@ -739,40 +739,6 @@ export class EntityDataSubscription { dataUpdatedCb: DataUpdatedCb) { if (this.entityDataSubscriptionOptions.type === widgetType.latest || this.entityDataSubscriptionOptions.type === widgetType.timeseries) { - if (entityData.latest) { - for (const type of Object.keys(entityData.latest)) { - const subscriptionData = this.toSubscriptionData(entityData.latest[type], false); - const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]); - if (isUpdate && EntityKeyType[type] === EntityKeyType.TIME_SERIES) { - const keys: string[] = Object.keys(subscriptionData); - const latestTsKeys = this.latestValues.filter(key => key.type === EntityKeyType.TIME_SERIES && keys.includes(key.key)); - if (latestTsKeys.length) { - const latestTsSubsciptionData: SubscriptionData = {}; - for (const latestTsKey of latestTsKeys) { - latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key]; - } - this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true, - this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); - } - const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key)); - if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { - const dataAggregator = this.tsLatestDataAggregators[dataIndex]; - const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg); - for (const aggTypeString of Object.keys(tsKeysByAggType)) { - const tsKeys = tsKeysByAggType[aggTypeString]; - const latestTsAggSubsciptionData: SubscriptionData = {}; - for (const tsKey of tsKeys) { - latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key]; - } - dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true); - } - } - } else { - this.onData(null, subscriptionData, dataKeyType, dataIndex, true, - this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); - } - } - } if (entityData.aggLatest) { for (const aggTypeString of Object.keys(entityData.aggLatest)) { const subscriptionData = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false); @@ -819,6 +785,40 @@ export class EntityDataSubscription { } } } + if (entityData.latest) { + for (const type of Object.keys(entityData.latest)) { + const subscriptionData = this.toSubscriptionData(entityData.latest[type], false); + const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]); + if (isUpdate && EntityKeyType[type] === EntityKeyType.TIME_SERIES) { + const keys: string[] = Object.keys(subscriptionData); + const latestTsKeys = this.latestValues.filter(key => key.type === EntityKeyType.TIME_SERIES && keys.includes(key.key)); + if (latestTsKeys.length) { + const latestTsSubsciptionData: SubscriptionData = {}; + for (const latestTsKey of latestTsKeys) { + latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key]; + } + this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true, + this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + } + const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key)); + if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { + const dataAggregator = this.tsLatestDataAggregators[dataIndex]; + const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg); + for (const aggTypeString of Object.keys(tsKeysByAggType)) { + const tsKeys = tsKeysByAggType[aggTypeString]; + const latestTsAggSubsciptionData: SubscriptionData = {}; + for (const tsKey of tsKeys) { + latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key]; + } + dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true); + } + } + } else { + this.onData(null, subscriptionData, dataKeyType, dataIndex, true, + this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + } + } + } } if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) { const subscriptionData = this.toSubscriptionData(entityData.timeseries, true);