UI: Aggregation improvements

This commit is contained in:
Igor Kulikov 2022-09-12 17:26:49 +03:00
parent 54936ba5d2
commit c6c40f50a8
2 changed files with 114 additions and 102 deletions

View File

@ -67,23 +67,30 @@ class AggDataMap {
} }
class AggregationMap { class AggregationMap {
aggMap: {[aggKey: string]: AggDataMap} = {}; aggMap: {[aggType: string]: {[key: string]: AggDataMap}} = {};
detectRangeChanged(): boolean { detectRangeChanged(): boolean {
let changed = false; let changed = false;
for (const aggKey of Object.keys(this.aggMap)) { for (const aggType of Object.keys(this.aggMap)) {
const aggDataMap = this.aggMap[aggKey]; const aggKeyMap = this.aggMap[aggType];
for (const key of Object.keys(aggKeyMap)) {
const aggDataMap = aggKeyMap[key];
if (aggDataMap.rangeChanged) { if (aggDataMap.rangeChanged) {
changed = true; changed = true;
aggDataMap.rangeChanged = false; aggDataMap.rangeChanged = false;
} }
} }
}
return changed; return changed;
} }
clearRangeChangedFlags() { clearRangeChangedFlags() {
for (const aggKey of Object.keys(this.aggMap)) { for (const aggType of Object.keys(this.aggMap)) {
this.aggMap[aggKey].rangeChanged = false; const aggKeyMap = this.aggMap[aggType];
for (const key of Object.keys(aggKeyMap)) {
const aggDataMap = aggKeyMap[key];
aggDataMap.rangeChanged = false;
}
} }
} }
} }
@ -202,13 +209,6 @@ export class DataAggregator {
return `${aggKey.key}_${aggKey.agg}`; return `${aggKey.key}_${aggKey.agg}`;
} }
private static aggKeyFromString(aggKeyString: string): AggKey {
const separatorIndex = aggKeyString.lastIndexOf('_');
const key = aggKeyString.substring(0, separatorIndex);
const agg = AggregationType[aggKeyString.substring(separatorIndex + 1)];
return { key, agg };
}
public updateOnDataCb(newOnDataCb: onAggregatedData): onAggregatedData { public updateOnDataCb(newOnDataCb: onAggregatedData): onAggregatedData {
const prevOnDataCb = this.onDataCb; const prevOnDataCb = this.onDataCb;
this.onDataCb = newOnDataCb; this.onDataCb = newOnDataCb;
@ -252,11 +252,11 @@ export class DataAggregator {
this.resetPending = false; this.resetPending = false;
updateIntervalScheduledTime = false; updateIntervalScheduledTime = false;
} }
if (update) {
this.aggregationMap = new AggregationMap(); this.aggregationMap = new AggregationMap();
if (update) {
this.updateAggregatedData(aggType, data.data); this.updateAggregatedData(aggType, data.data);
} else { } else {
this.aggregationMap = this.processAggregatedData(aggType, data.data); this.processAggregatedData(aggType, data.data);
} }
if (updateIntervalScheduledTime) { if (updateIntervalScheduledTime) {
this.intervalScheduledTime = this.utils.currentPerfTime(); this.intervalScheduledTime = this.utils.currentPerfTime();
@ -264,7 +264,11 @@ export class DataAggregator {
this.aggregationMap.clearRangeChangedFlags(); this.aggregationMap.clearRangeChangedFlags();
this.onInterval(history, detectChanges); this.onInterval(history, detectChanges);
} else { } else {
if (this.aggregationMap.aggMap[aggType]) {
this.updateAggregatedData(aggType, data.data); this.updateAggregatedData(aggType, data.data);
} else {
this.processAggregatedData(aggType, data.data);
}
if (history) { if (history) {
this.intervalScheduledTime = this.utils.currentPerfTime(); this.intervalScheduledTime = this.utils.currentPerfTime();
this.onInterval(history, detectChanges); this.onInterval(history, detectChanges);
@ -338,11 +342,12 @@ export class DataAggregator {
} }
this.dataBuffer[key.agg][key.key] = []; this.dataBuffer[key.agg][key.key] = [];
}); });
for (const aggKeyString of Object.keys(this.aggregationMap.aggMap)) { for (const aggType of Object.keys(this.aggregationMap.aggMap)) {
const aggKeyData = this.aggregationMap.aggMap[aggKeyString]; for (const key of Object.keys(this.aggregationMap.aggMap[aggType])) {
const aggKey = DataAggregator.aggKeyFromString(aggKeyString); const aggKeyData = this.aggregationMap.aggMap[aggType][key];
const noAggregation = aggKey.agg === AggregationType.NONE; const noAggregation = AggregationType[aggType] === AggregationType.NONE;
let keyData = this.dataBuffer[aggKey.agg][aggKey.key]; const aggKeyString = DataAggregator.aggKeyToString({agg: AggregationType[aggType], key});
let keyData = this.dataBuffer[aggType][key];
aggKeyData.forEach((aggData, aggTimestamp) => { aggKeyData.forEach((aggData, aggTimestamp) => {
if (aggTimestamp < this.startTs) { if (aggTimestamp < this.startTs) {
if (this.subsTw.aggregation.stateData && if (this.subsTw.aggregation.stateData &&
@ -362,16 +367,16 @@ export class DataAggregator {
const timestamp = this.startTs + (this.endTs - this.startTs) / 2; const timestamp = this.startTs + (this.endTs - this.startTs) / 2;
const first = keyData[0]; const first = keyData[0];
const aggData: AggData = { const aggData: AggData = {
aggValue: aggKey.agg === AggregationType.COUNT ? 1 : first[1], aggValue: AggregationType[aggType] === AggregationType.COUNT ? 1 : first[1],
sum: first[1], sum: first[1],
count: 1 count: 1
}; };
const aggFunction = DataAggregator.getAggFunction(aggKey.agg); const aggFunction = DataAggregator.getAggFunction(AggregationType[aggType]);
for (let i = 1; i < keyData.length; i++) { for (let i = 1; i < keyData.length; i++) {
const kvPair = keyData[i]; const kvPair = keyData[i];
aggFunction(aggData, kvPair[1]); aggFunction(aggData, kvPair[1]);
} }
this.dataBuffer[aggKey.agg][aggKey.key] = [[timestamp, aggData.aggValue]]; this.dataBuffer[aggType][key] = [[timestamp, aggData.aggValue]];
} }
} else { } else {
if (this.subsTw.aggregation.stateData) { if (this.subsTw.aggregation.stateData) {
@ -380,7 +385,8 @@ export class DataAggregator {
if (keyData.length > this.subsTw.aggregation.limit) { if (keyData.length > this.subsTw.aggregation.limit) {
keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit); keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit);
} }
this.dataBuffer[aggKey.agg][aggKey.key] = keyData; this.dataBuffer[aggType][key] = keyData;
}
} }
} }
return this.dataBuffer; return this.dataBuffer;
@ -414,16 +420,19 @@ export class DataAggregator {
} }
} }
private processAggregatedData(aggType: AggregationType, data: SubscriptionData): AggregationMap { private processAggregatedData(aggType: AggregationType, data: SubscriptionData) {
const isCount = aggType === AggregationType.COUNT; const isCount = aggType === AggregationType.COUNT;
const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg; const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg;
const aggregationMap = new AggregationMap(); let aggDataByKey = this.aggregationMap.aggMap[aggType];
if (!aggDataByKey) {
aggDataByKey = {};
this.aggregationMap.aggMap[aggType] = aggDataByKey;
}
for (const key of Object.keys(data)) { for (const key of Object.keys(data)) {
const aggKey = DataAggregator.aggKeyToString({key, agg: aggType}); let aggKeyData = aggDataByKey[key];
let aggKeyData = aggregationMap.aggMap[aggKey];
if (!aggKeyData) { if (!aggKeyData) {
aggKeyData = new AggDataMap(); aggKeyData = new AggDataMap();
aggregationMap.aggMap[aggKey] = aggKeyData; aggDataByKey[key] = aggKeyData;
} }
const keyData = data[key]; const keyData = data[key];
keyData.forEach((kvPair) => { keyData.forEach((kvPair) => {
@ -438,18 +447,21 @@ export class DataAggregator {
aggKeyData.set(tsKey, aggData); aggKeyData.set(tsKey, aggData);
}); });
} }
return aggregationMap;
} }
private updateAggregatedData(aggType: AggregationType, data: SubscriptionData) { private updateAggregatedData(aggType: AggregationType, data: SubscriptionData) {
const isCount = aggType === AggregationType.COUNT; const isCount = aggType === AggregationType.COUNT;
const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg; const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg;
let aggDataByKey = this.aggregationMap.aggMap[aggType];
if (!aggDataByKey) {
aggDataByKey = {};
this.aggregationMap.aggMap[aggType] = aggDataByKey;
}
for (const key of Object.keys(data)) { for (const key of Object.keys(data)) {
const aggKey = DataAggregator.aggKeyToString({key, agg: aggType}); let aggKeyData = aggDataByKey[key];
let aggKeyData = this.aggregationMap.aggMap[aggKey];
if (!aggKeyData) { if (!aggKeyData) {
aggKeyData = new AggDataMap(); aggKeyData = new AggDataMap();
this.aggregationMap.aggMap[aggKey] = aggKeyData; aggDataByKey[key] = aggKeyData;
} }
const keyData = data[key]; const keyData = data[key];
keyData.forEach((kvPair) => { keyData.forEach((kvPair) => {

View File

@ -606,7 +606,7 @@ export class EntityDataSubscription {
if (this.aggTsValues && this.aggTsValues.length) { if (this.aggTsValues && this.aggTsValues.length) {
const aggLatestTimewindow = deepClone(this.subsTw); const aggLatestTimewindow = deepClone(this.subsTw);
aggLatestTimewindow.aggregation.stateData = false; aggLatestTimewindow.aggregation.stateData = false;
const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval; const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval && !this.history;
if (!isFloatingLatestDataAgg) { if (!isFloatingLatestDataAgg) {
aggLatestTimewindow.aggregation.interval = aggLatestTimewindow.aggregation.timeWindow; aggLatestTimewindow.aggregation.interval = aggLatestTimewindow.aggregation.timeWindow;
} }
@ -739,40 +739,6 @@ export class EntityDataSubscription {
dataUpdatedCb: DataUpdatedCb) { dataUpdatedCb: DataUpdatedCb) {
if (this.entityDataSubscriptionOptions.type === widgetType.latest || if (this.entityDataSubscriptionOptions.type === widgetType.latest ||
this.entityDataSubscriptionOptions.type === widgetType.timeseries) { this.entityDataSubscriptionOptions.type === widgetType.timeseries) {
if (entityData.latest) {
for (const type of Object.keys(entityData.latest)) {
const subscriptionData = this.toSubscriptionData(entityData.latest[type], false);
const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]);
if (isUpdate && EntityKeyType[type] === EntityKeyType.TIME_SERIES) {
const keys: string[] = Object.keys(subscriptionData);
const latestTsKeys = this.latestValues.filter(key => key.type === EntityKeyType.TIME_SERIES && keys.includes(key.key));
if (latestTsKeys.length) {
const latestTsSubsciptionData: SubscriptionData = {};
for (const latestTsKey of latestTsKeys) {
latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key];
}
this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true,
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
}
const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key));
if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) {
const dataAggregator = this.tsLatestDataAggregators[dataIndex];
const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg);
for (const aggTypeString of Object.keys(tsKeysByAggType)) {
const tsKeys = tsKeysByAggType[aggTypeString];
const latestTsAggSubsciptionData: SubscriptionData = {};
for (const tsKey of tsKeys) {
latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key];
}
dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true);
}
}
} else {
this.onData(null, subscriptionData, dataKeyType, dataIndex, true,
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
}
}
}
if (entityData.aggLatest) { if (entityData.aggLatest) {
for (const aggTypeString of Object.keys(entityData.aggLatest)) { for (const aggTypeString of Object.keys(entityData.aggLatest)) {
const subscriptionData = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false); const subscriptionData = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false);
@ -819,6 +785,40 @@ export class EntityDataSubscription {
} }
} }
} }
if (entityData.latest) {
for (const type of Object.keys(entityData.latest)) {
const subscriptionData = this.toSubscriptionData(entityData.latest[type], false);
const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]);
if (isUpdate && EntityKeyType[type] === EntityKeyType.TIME_SERIES) {
const keys: string[] = Object.keys(subscriptionData);
const latestTsKeys = this.latestValues.filter(key => key.type === EntityKeyType.TIME_SERIES && keys.includes(key.key));
if (latestTsKeys.length) {
const latestTsSubsciptionData: SubscriptionData = {};
for (const latestTsKey of latestTsKeys) {
latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key];
}
this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true,
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
}
const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key));
if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) {
const dataAggregator = this.tsLatestDataAggregators[dataIndex];
const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg);
for (const aggTypeString of Object.keys(tsKeysByAggType)) {
const tsKeys = tsKeysByAggType[aggTypeString];
const latestTsAggSubsciptionData: SubscriptionData = {};
for (const tsKey of tsKeys) {
latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key];
}
dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true);
}
}
} else {
this.onData(null, subscriptionData, dataKeyType, dataIndex, true,
this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
}
}
}
} }
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) { if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) {
const subscriptionData = this.toSubscriptionData(entityData.timeseries, true); const subscriptionData = this.toSubscriptionData(entityData.timeseries, true);