diff --git a/ui-ngx/src/app/core/api/data-aggregator.ts b/ui-ngx/src/app/core/api/data-aggregator.ts index 7485a4bc87..2f86b4a4ce 100644 --- a/ui-ngx/src/app/core/api/data-aggregator.ts +++ b/ui-ngx/src/app/core/api/data-aggregator.ts @@ -14,7 +14,7 @@ /// limitations under the License. /// -import { SubscriptionData, SubscriptionDataHolder } from '@app/shared/models/telemetry/telemetry.models'; +import { AggKey, SubscriptionData, SubscriptionDataHolder } from '@app/shared/models/telemetry/telemetry.models'; import { AggregationType, calculateIntervalComparisonEndTime, @@ -25,10 +25,10 @@ import { SubscriptionTimewindow } from '@shared/models/time/time.models'; import { UtilsService } from '@core/services/utils.service'; -import { deepClone, isNumber, isNumeric } from '@core/utils'; +import { deepClone, isDefinedAndNotNull, isNumber, isNumeric } from '@core/utils'; import Timeout = NodeJS.Timeout; -export declare type onAggregatedData = (data: SubscriptionData, detectChanges: boolean) => void; +export declare type onAggregatedData = (aggType: AggregationType, data: SubscriptionData, detectChanges: boolean) => void; interface AggData { count: number; @@ -67,12 +67,12 @@ class AggDataMap { } class AggregationMap { - aggMap: {[key: string]: AggDataMap} = {}; + aggMap: {[aggKey: string]: AggDataMap} = {}; detectRangeChanged(): boolean { let changed = false; - for (const key of Object.keys(this.aggMap)) { - const aggDataMap = this.aggMap[key]; + for (const aggKey of Object.keys(this.aggMap)) { + const aggDataMap = this.aggMap[aggKey]; if (aggDataMap.rangeChanged) { changed = true; aggDataMap.rangeChanged = false; @@ -82,8 +82,8 @@ class AggregationMap { } clearRangeChangedFlags() { - for (const key of Object.keys(this.aggMap)) { - this.aggMap[key].rangeChanged = false; + for (const aggKey of Object.keys(this.aggMap)) { + this.aggMap[aggKey].rangeChanged = false; } } } @@ -93,7 +93,7 @@ declare type AggFunction = (aggData: AggData, value?: any) => void; const avg: AggFunction = (aggData: AggData, value?: any) => { aggData.count++; if (isNumber(value)) { - aggData.sum += value; + aggData.sum = aggData.aggValue * (aggData.count - 1) + value; aggData.aggValue = aggData.sum / aggData.count; } else { aggData.aggValue = value; @@ -135,9 +135,27 @@ const none: AggFunction = (aggData: AggData, value?: any) => { export class DataAggregator { - private dataBuffer: SubscriptionData = {}; - private data: SubscriptionData; - private readonly lastPrevKvPairData: {[key: string]: [number, any]}; + constructor(private onDataCb: onAggregatedData, + private tsKeys: AggKey[], + private isLatestDataAgg: boolean, + private isFloatingLatestDataAgg: boolean, + private subsTw: SubscriptionTimewindow, + private utils: UtilsService, + private ignoreDataUpdateOnIntervalTick: boolean) { + this.tsKeys.forEach((key) => { + if (!this.dataBuffer[key.agg]) { + this.dataBuffer[key.agg] = {}; + } + this.dataBuffer[key.agg][key.key] = []; + }); + if (this.subsTw.aggregation.stateData) { + this.lastPrevKvPairData = {}; + } + } + + private dataBuffer: {[aggType: string]: SubscriptionData} = {}; + private data: {[aggType: string]: SubscriptionData}; + private readonly lastPrevKvPairData: {[aggKey: string]: [number, any]}; private aggregationMap: AggregationMap; @@ -145,9 +163,7 @@ export class DataAggregator { private resetPending = false; private updatedData = false; - private noAggregation = this.subsTw.aggregation.type === AggregationType.NONE; - private aggregationTimeout = Math.max(this.subsTw.aggregation.interval, 1000); - private readonly aggFunction: AggFunction; + private aggregationTimeout = this.isLatestDataAgg ? 1000 : Math.max(this.subsTw.aggregation.interval, 1000); private intervalTimeoutHandle: Timeout; private intervalScheduledTime: number; @@ -156,41 +172,43 @@ export class DataAggregator { private endTs: number; private elapsed: number; - constructor(private onDataCb: onAggregatedData, - private tsKeyNames: string[], - private subsTw: SubscriptionTimewindow, - private utils: UtilsService, - private ignoreDataUpdateOnIntervalTick: boolean) { - this.tsKeyNames.forEach((key) => { - this.dataBuffer[key] = []; - }); - if (this.subsTw.aggregation.stateData) { - this.lastPrevKvPairData = {}; + private static convertValue(val: string, noAggregation: boolean): any { + if (val && isNumeric(val) && (!noAggregation || noAggregation && Number(val).toString() === val)) { + return Number(val); } - switch (this.subsTw.aggregation.type) { + return val; + } + + private static getAggFunction(aggType: AggregationType): AggFunction { + switch (aggType) { case AggregationType.MIN: - this.aggFunction = min; - break; + return min; case AggregationType.MAX: - this.aggFunction = max; - break; + return max; case AggregationType.AVG: - this.aggFunction = avg; - break; + return avg; case AggregationType.SUM: - this.aggFunction = sum; - break; + return sum; case AggregationType.COUNT: - this.aggFunction = count; - break; + return count; case AggregationType.NONE: - this.aggFunction = none; - break; + return none; default: - this.aggFunction = avg; + return avg; } } + private static aggKeyToString(aggKey: AggKey): string { + return `${aggKey.key}_${aggKey.agg}`; + } + + private static aggKeyFromString(aggKeyString: string): AggKey { + const separatorIndex = aggKeyString.lastIndexOf('_'); + const key = aggKeyString.substring(0, separatorIndex); + const agg = AggregationType[aggKeyString.substring(separatorIndex + 1)]; + return { key, agg }; + } + public updateOnDataCb(newOnDataCb: onAggregatedData): onAggregatedData { const prevOnDataCb = this.onDataCb; this.onDataCb = newOnDataCb; @@ -206,7 +224,7 @@ export class DataAggregator { this.intervalScheduledTime = this.utils.currentPerfTime(); this.calculateStartEndTs(); this.elapsed = 0; - this.aggregationTimeout = Math.max(this.subsTw.aggregation.interval, 1000); + this.aggregationTimeout = this.isLatestDataAgg ? 1000 : Math.max(this.subsTw.aggregation.interval, 1000); this.resetPending = true; this.updatedData = false; this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), this.aggregationTimeout); @@ -220,7 +238,8 @@ export class DataAggregator { this.aggregationMap = null; } - public onData(data: SubscriptionDataHolder, update: boolean, history: boolean, detectChanges: boolean) { + public onData(aggType: AggregationType, + data: SubscriptionDataHolder, update: boolean, history: boolean, detectChanges: boolean) { this.updatedData = true; if (!this.dataReceived || this.resetPending) { let updateIntervalScheduledTime = true; @@ -235,9 +254,9 @@ export class DataAggregator { } if (update) { this.aggregationMap = new AggregationMap(); - this.updateAggregatedData(data.data); + this.updateAggregatedData(aggType, data.data); } else { - this.aggregationMap = this.processAggregatedData(data.data); + this.aggregationMap = this.processAggregatedData(aggType, data.data); } if (updateIntervalScheduledTime) { this.intervalScheduledTime = this.utils.currentPerfTime(); @@ -245,7 +264,7 @@ export class DataAggregator { this.aggregationMap.clearRangeChangedFlags(); this.onInterval(history, detectChanges); } else { - this.updateAggregatedData(data.data); + this.updateAggregatedData(aggType, data.data); if (history) { this.intervalScheduledTime = this.utils.currentPerfTime(); this.onInterval(history, detectChanges); @@ -283,9 +302,9 @@ export class DataAggregator { } const intervalTimeout = rangeChanged ? this.aggregationTimeout - this.elapsed : this.aggregationTimeout; if (!history) { - const delta = Math.floor(this.elapsed / this.subsTw.aggregation.interval); + const delta = Math.floor(this.elapsed / this.aggregationTimeout); if (delta || !this.data || rangeChanged) { - const tickTs = delta * this.subsTw.aggregation.interval; + const tickTs = delta * this.aggregationTimeout; if (this.subsTw.quickInterval) { const startEndTime = calculateIntervalStartEndTime(this.subsTw.quickInterval, this.subsTw.timezone); this.startTs = startEndTime[0] + this.subsTw.tsOffset; @@ -295,13 +314,15 @@ export class DataAggregator { this.endTs += tickTs; } this.data = this.updateData(); - this.elapsed = this.elapsed - delta * this.subsTw.aggregation.interval; + this.elapsed = this.elapsed - delta * this.aggregationTimeout; } } else { this.data = this.updateData(); } if (this.onDataCb && (!this.ignoreDataUpdateOnIntervalTick || this.updatedData)) { - this.onDataCb(this.data, detectChanges); + for (const aggType of Object.keys(this.data)) { + this.onDataCb(AggregationType[aggType], this.data[aggType], detectChanges); + } this.updatedData = false; } if (!history) { @@ -309,39 +330,63 @@ export class DataAggregator { } } - private updateData(): SubscriptionData { - this.tsKeyNames.forEach((key) => { - this.dataBuffer[key] = []; + private updateData(): {[aggType: string]: SubscriptionData} { + this.dataBuffer = {}; + this.tsKeys.forEach((key) => { + if (!this.dataBuffer[key.agg]) { + this.dataBuffer[key.agg] = {}; + } + this.dataBuffer[key.agg][key.key] = []; }); - for (const key of Object.keys(this.aggregationMap.aggMap)) { - const aggKeyData = this.aggregationMap.aggMap[key]; - let keyData = this.dataBuffer[key]; + for (const aggKeyString of Object.keys(this.aggregationMap.aggMap)) { + const aggKeyData = this.aggregationMap.aggMap[aggKeyString]; + const aggKey = DataAggregator.aggKeyFromString(aggKeyString); + const noAggregation = aggKey.agg === AggregationType.NONE; + let keyData = this.dataBuffer[aggKey.agg][aggKey.key]; aggKeyData.forEach((aggData, aggTimestamp) => { if (aggTimestamp < this.startTs) { if (this.subsTw.aggregation.stateData && - (!this.lastPrevKvPairData[key] || this.lastPrevKvPairData[key][0] < aggTimestamp)) { - this.lastPrevKvPairData[key] = [aggTimestamp, aggData.aggValue]; + (!this.lastPrevKvPairData[aggKeyString] || this.lastPrevKvPairData[aggKeyString][0] < aggTimestamp)) { + this.lastPrevKvPairData[aggKeyString] = [aggTimestamp, aggData.aggValue]; } aggKeyData.delete(aggTimestamp); this.updatedData = true; - } else if (aggTimestamp < this.endTs || this.noAggregation) { + } else if (aggTimestamp < this.endTs || noAggregation) { const kvPair: [number, any] = [aggTimestamp, aggData.aggValue]; keyData.push(kvPair); } }); keyData.sort((set1, set2) => set1[0] - set2[0]); - if (this.subsTw.aggregation.stateData) { - this.updateStateBounds(keyData, deepClone(this.lastPrevKvPairData[key])); + if (this.isFloatingLatestDataAgg) { + if (keyData.length) { + const timestamp = this.startTs + (this.endTs - this.startTs) / 2; + const first = keyData[0]; + const aggData: AggData = { + aggValue: aggKey.agg === AggregationType.COUNT ? 1 : first[1], + sum: first[1], + count: 1 + }; + const aggFunction = DataAggregator.getAggFunction(aggKey.agg); + for (let i = 1; i < keyData.length; i++) { + const kvPair = keyData[i]; + aggFunction(aggData, kvPair[1]); + } + this.dataBuffer[aggKey.agg][aggKey.key] = [[timestamp, aggData.aggValue]]; + } + } else { + if (this.subsTw.aggregation.stateData) { + this.updateStateBounds(keyData, deepClone(this.lastPrevKvPairData[aggKeyString])); + } + if (keyData.length > this.subsTw.aggregation.limit) { + keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit); + } + this.dataBuffer[aggKey.agg][aggKey.key] = keyData; } - if (keyData.length > this.subsTw.aggregation.limit) { - keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit); - } - this.dataBuffer[key] = keyData; } return this.dataBuffer; } - private updateStateBounds(keyData: [number, any][], lastPrevKvPair: [number, any]) { + private updateStateBounds(keyData: [number, any, number?][], lastPrevKvPair: [number, any]) { if (lastPrevKvPair) { lastPrevKvPair[0] = this.startTs; } @@ -369,66 +414,63 @@ export class DataAggregator { } } - private processAggregatedData(data: SubscriptionData): AggregationMap { - const isCount = this.subsTw.aggregation.type === AggregationType.COUNT; + private processAggregatedData(aggType: AggregationType, data: SubscriptionData): AggregationMap { + const isCount = aggType === AggregationType.COUNT; + const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg; const aggregationMap = new AggregationMap(); for (const key of Object.keys(data)) { - let aggKeyData = aggregationMap.aggMap[key]; + const aggKey = DataAggregator.aggKeyToString({key, agg: aggType}); + let aggKeyData = aggregationMap.aggMap[aggKey]; if (!aggKeyData) { aggKeyData = new AggDataMap(); - aggregationMap.aggMap[key] = aggKeyData; + aggregationMap.aggMap[aggKey] = aggKeyData; } const keyData = data[key]; keyData.forEach((kvPair) => { const timestamp = kvPair[0]; - const value = this.convertValue(kvPair[1]); - const aggKey = timestamp; + const value = DataAggregator.convertValue(kvPair[1], noAggregation); + const tsKey = timestamp; const aggData = { - count: isCount ? value : 1, + count: isCount ? value : isDefinedAndNotNull(kvPair[2]) ? kvPair[2] : 1, sum: value, aggValue: value }; - aggKeyData.set(aggKey, aggData); + aggKeyData.set(tsKey, aggData); }); } return aggregationMap; } - private updateAggregatedData(data: SubscriptionData) { - const isCount = this.subsTw.aggregation.type === AggregationType.COUNT; + private updateAggregatedData(aggType: AggregationType, data: SubscriptionData) { + const isCount = aggType === AggregationType.COUNT; + const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg; for (const key of Object.keys(data)) { - let aggKeyData = this.aggregationMap.aggMap[key]; + const aggKey = DataAggregator.aggKeyToString({key, agg: aggType}); + let aggKeyData = this.aggregationMap.aggMap[aggKey]; if (!aggKeyData) { aggKeyData = new AggDataMap(); - this.aggregationMap.aggMap[key] = aggKeyData; + this.aggregationMap.aggMap[aggKey] = aggKeyData; } const keyData = data[key]; keyData.forEach((kvPair) => { const timestamp = kvPair[0]; - const value = this.convertValue(kvPair[1]); - const aggTimestamp = this.noAggregation ? timestamp : (this.startTs + + const value = DataAggregator.convertValue(kvPair[1], noAggregation); + const aggTimestamp = noAggregation ? timestamp : (this.startTs + Math.floor((timestamp - this.startTs) / this.subsTw.aggregation.interval) * this.subsTw.aggregation.interval + this.subsTw.aggregation.interval / 2); let aggData = aggKeyData.get(aggTimestamp); - if (!aggData) { + if (!aggData || this.isFloatingLatestDataAgg) { aggData = { - count: 1, + count: isDefinedAndNotNull(kvPair[2]) ? kvPair[2] : 1, sum: value, aggValue: isCount ? 1 : value }; aggKeyData.set(aggTimestamp, aggData); } else { - this.aggFunction(aggData, value); + DataAggregator.getAggFunction(aggType)(aggData, value); } }); } } - private convertValue(val: string): any { - if (val && isNumeric(val) && (!this.noAggregation || this.noAggregation && Number(val).toString() === val)) { - return Number(val); - } - return val; - } - } diff --git a/ui-ngx/src/app/core/api/entity-data-subscription.ts b/ui-ngx/src/app/core/api/entity-data-subscription.ts index f1da67c92c..cb02acbe44 100644 --- a/ui-ngx/src/app/core/api/entity-data-subscription.ts +++ b/ui-ngx/src/app/core/api/entity-data-subscription.ts @@ -47,6 +47,7 @@ import { EntityType } from '@shared/models/entity-type.models'; import { Observable, of, ReplaySubject, Subject } from 'rxjs'; import { EntityId } from '@shared/models/id/entity-id'; import Timeout = NodeJS.Timeout; +import _ from 'lodash'; declare type DataKeyFunction = (time: number, prevValue: any) => any; declare type DataKeyPostFunction = (time: number, value: any, prevValue: any, timePrev: number, prevOrigValue: any) => any; @@ -84,6 +85,12 @@ export interface EntityDataSubscriptionOptions { export class EntityDataSubscription { + constructor(private listener: EntityDataListener, + private telemetryService: TelemetryService, + private utils: UtilsService) { + this.initializeSubscription(); + } + private entityDataSubscriptionOptions = this.listener.subscriptionOptions; private datasourceType: DatasourceType = this.entityDataSubscriptionOptions.datasourceType; private history: boolean; @@ -104,6 +111,7 @@ export class EntityDataSubscription { private subsTw: SubscriptionTimewindow; private latestTsOffset: number; private dataAggregators: Array; + private tsLatestDataAggregators: Array; private dataKeys: {[key: string]: Array | SubscriptionDataKey} = {}; private datasourceData: {[index: number]: {[key: string]: DataSetHolder}}; private datasourceOrigData: {[index: number]: {[key: string]: DataSetHolder}}; @@ -119,10 +127,11 @@ export class EntityDataSubscription { private dataResolved = false; private started = false; - constructor(private listener: EntityDataListener, - private telemetryService: TelemetryService, - private utils: UtilsService) { - this.initializeSubscription(); + private static convertValue(val: string): any { + if (val && isNumeric(val) && Number(val).toString() === val) { + return Number(val); + } + return val; } private initializeSubscription() { @@ -184,6 +193,12 @@ export class EntityDataSubscription { }); this.dataAggregators = null; } + if (this.tsLatestDataAggregators) { + this.tsLatestDataAggregators.forEach((aggregator) => { + aggregator.destroy(); + }); + this.tsLatestDataAggregators = null; + } this.pageData = null; } @@ -192,12 +207,7 @@ export class EntityDataSubscription { if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { this.started = true; this.dataResolved = true; - this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow; - this.latestTsOffset = this.entityDataSubscriptionOptions.latestTsOffset; - this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow && - isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow); - this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow && - isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs); + this.prepareSubscriptionTimewindow(); } if (this.datasourceType === DatasourceType.entity) { const entityFields: Array = @@ -434,12 +444,7 @@ export class EntityDataSubscription { if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) { return; } - this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow; - this.latestTsOffset = this.entityDataSubscriptionOptions.latestTsOffset; - this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow && - isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow); - this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow && - isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs); + this.prepareSubscriptionTimewindow(); this.prepareData(); @@ -473,6 +478,15 @@ export class EntityDataSubscription { this.started = true; } + private prepareSubscriptionTimewindow() { + this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow; + this.latestTsOffset = this.entityDataSubscriptionOptions.latestTsOffset; + this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow && + isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow); + this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow && + isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs); + } + private prepareSubscriptionCommands(cmd: EntityDataCmd) { if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { if (this.tsFields.length > 0) { @@ -554,10 +568,16 @@ export class EntityDataSubscription { }); } this.dataAggregators = []; + if (this.tsLatestDataAggregators) { + this.tsLatestDataAggregators.forEach((aggregator) => { + aggregator.destroy(); + }); + } + this.tsLatestDataAggregators = []; this.resetData(); if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { - let tsKeyNames = []; + let tsKeyNames: string[] = []; if (this.datasourceType === DatasourceType.function) { for (const key of Object.keys(this.dataKeys)) { const dataKeysList = this.dataKeys[key] as Array; @@ -570,18 +590,32 @@ export class EntityDataSubscription { } else { tsKeyNames = this.tsFields ? this.tsFields.map(field => field.key) : []; } - for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { - if (tsKeyNames.length) { + const aggKeys: AggKey[] = tsKeyNames.map(key => ({key, agg: this.subsTw.aggregation.type})); + if (aggKeys.length) { + for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { if (this.datasourceType === DatasourceType.function) { - this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, - DataKeyType.function, dataIndex, this.notifyListener.bind(this)); + this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, aggKeys, + false, false, DataKeyType.function, dataIndex, this.notifyListener.bind(this)); } else { - this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames, - DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); + this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, aggKeys, + false, false, DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); } } } } + if (this.aggTsValues && this.aggTsValues.length) { + const aggLatestTimewindow = deepClone(this.subsTw); + aggLatestTimewindow.aggregation.stateData = false; + const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval; + if (!isFloatingLatestDataAgg) { + aggLatestTimewindow.aggregation.interval = aggLatestTimewindow.aggregation.timeWindow; + } + for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) { + this.tsLatestDataAggregators[dataIndex] = this.createRealtimeDataAggregator(aggLatestTimewindow, this.aggTsValues, + true, isFloatingLatestDataAgg, + DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this)); + } + } } private resetData() { @@ -703,13 +737,87 @@ export class EntityDataSubscription { private processEntityData(entityData: EntityData, dataIndex: number, isUpdate: boolean, dataUpdatedCb: DataUpdatedCb) { - if ((this.entityDataSubscriptionOptions.type === widgetType.latest || - this.entityDataSubscriptionOptions.type === widgetType.timeseries) && entityData.latest) { - for (const type of Object.keys(entityData.latest)) { - const subscriptionData = this.toSubscriptionData(entityData.latest[type], false); - const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]); - this.onData(subscriptionData, dataKeyType, dataIndex, true, - this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + if (this.entityDataSubscriptionOptions.type === widgetType.latest || + this.entityDataSubscriptionOptions.type === widgetType.timeseries) { + if (entityData.latest) { + for (const type of Object.keys(entityData.latest)) { + const subscriptionData = this.toSubscriptionData(entityData.latest[type], false); + const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]); + if (isUpdate && EntityKeyType[type] === EntityKeyType.TIME_SERIES) { + const keys: string[] = Object.keys(subscriptionData); + const latestTsKeys = this.latestValues.filter(key => key.type === EntityKeyType.TIME_SERIES && keys.includes(key.key)); + if (latestTsKeys.length) { + const latestTsSubsciptionData: SubscriptionData = {}; + for (const latestTsKey of latestTsKeys) { + latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key]; + } + this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true, + this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + } + const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key)); + if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { + const dataAggregator = this.tsLatestDataAggregators[dataIndex]; + const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg); + for (const aggTypeString of Object.keys(tsKeysByAggType)) { + const tsKeys = tsKeysByAggType[aggTypeString]; + const latestTsAggSubsciptionData: SubscriptionData = {}; + for (const tsKey of tsKeys) { + latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key]; + } + dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true); + } + } + } else { + this.onData(null, subscriptionData, dataKeyType, dataIndex, true, + this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + } + } + } + if (entityData.aggLatest) { + for (const aggTypeString of Object.keys(entityData.aggLatest)) { + const subscriptionData = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false); + if (this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { + const dataAggregator = this.tsLatestDataAggregators[dataIndex]; + let prevDataCb; + if (!isUpdate) { + prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => { + this.onData(aggType, data, DataKeyType.timeseries, dataIndex, detectChanges, + this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + }); + } + dataAggregator.onData(AggregationType[aggTypeString], {data: subscriptionData}, false, this.history, true); + if (prevDataCb) { + dataAggregator.updateOnDataCb(prevDataCb); + } + } + } + } + if (entityData.aggFloating) { + const subscriptionData = this.toSubscriptionData(entityData.aggFloating, true); + const keys: string[] = Object.keys(subscriptionData); + const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key)); + if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) { + const dataAggregator = this.tsLatestDataAggregators[dataIndex]; + let prevDataCb; + if (!isUpdate) { + prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => { + this.onData(aggType, data, DataKeyType.timeseries, dataIndex, detectChanges, + this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb); + }); + } + const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg); + for (const aggTypeString of Object.keys(tsKeysByAggType)) { + const tsKeys = tsKeysByAggType[aggTypeString]; + const latestTsAggSubsciptionData: SubscriptionData = {}; + for (const tsKey of tsKeys) { + latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key]; + } + dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, false, this.history, true); + } + if (prevDataCb) { + dataAggregator.updateOnDataCb(prevDataCb); + } + } } } if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) { @@ -718,26 +826,29 @@ export class EntityDataSubscription { const dataAggregator = this.dataAggregators[dataIndex]; let prevDataCb; if (!isUpdate) { - prevDataCb = dataAggregator.updateOnDataCb((data, detectChanges) => { - this.onData(data, this.datasourceType === DatasourceType.function ? + prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => { + this.onData(null, data, this.datasourceType === DatasourceType.function ? DataKeyType.function : DataKeyType.timeseries, dataIndex, detectChanges, false, dataUpdatedCb); }); } - dataAggregator.onData({data: subscriptionData}, false, this.history, true); + dataAggregator.onData(this.subsTw.aggregation.type, {data: subscriptionData}, false, this.history, true); if (prevDataCb) { dataAggregator.updateOnDataCb(prevDataCb); } } else if (!this.history && !isUpdate) { - this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, false, dataUpdatedCb); + this.onData(null, + subscriptionData, DataKeyType.timeseries, dataIndex, true, false, dataUpdatedCb); } } } - private onData(sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean, + private onData(aggType: AggregationType | undefined | null, + sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean, isTsLatest: boolean, dataUpdatedCb: DataUpdatedCb) { for (const keyName of Object.keys(sourceData)) { const keyData = sourceData[keyName]; - const key = `${keyName}_${type}${isTsLatest ? '_latest' : ''}`; + const aggSuffix = aggType && aggType !== AggregationType.NONE ? `_${aggType.toLowerCase()}` : ''; + const key = `${keyName}_${type}${aggSuffix}${isTsLatest ? '_latest' : ''}`; const dataKeyList = this.dataKeys[key] as Array; for (let keyIndex = 0; dataKeyList && keyIndex < dataKeyList.length; keyIndex++) { const datasourceKey = `${key}_${keyIndex}`; @@ -768,28 +879,28 @@ export class EntityDataSubscription { keyData.forEach((keySeries) => { let series = keySeries; const time = series[0]; - this.datasourceOrigData[dataIndex][datasourceKey].data.push(series); - let value = this.convertValue(series[1]); + this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]); + let value = EntityDataSubscription.convertValue(series[1]); if (dataKey.postFunc) { value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]); } - prevOrigSeries = series; - series = [time, value]; - data.push(series); - prevSeries = series; + prevOrigSeries = [series[0], series[1]]; + series = [series[0], value]; + data.push([series[0], series[1]]); + prevSeries = [series[0], series[1]]; }); update = true; } else if (this.entityDataSubscriptionOptions.type === widgetType.latest || isTsLatest) { if (keyData.length > 0) { let series = keyData[0]; const time = series[0]; - this.datasourceOrigData[dataIndex][datasourceKey].data.push(series); - let value = this.convertValue(series[1]); + this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]); + let value = EntityDataSubscription.convertValue(series[1]); if (dataKey.postFunc) { value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]); } series = [time, value]; - data.push(series); + data.push([series[0], series[1]]); } update = true; } @@ -802,25 +913,18 @@ export class EntityDataSubscription { } } - private convertValue(val: string): any { - if (val && isNumeric(val) && Number(val).toString() === val) { - return Number(val); - } - return val; - } - private toSubscriptionData(sourceData: {[key: string]: TsValue | TsValue[]}, isTs: boolean): SubscriptionData { const subsData: SubscriptionData = {}; for (const keyName of Object.keys(sourceData)) { const values = sourceData[keyName]; - const dataSet: [number, any][] = []; + const dataSet: [number, any, number?][] = []; if (isTs) { (values as TsValue[]).forEach((keySeries) => { - dataSet.push([keySeries.ts, keySeries.value]); + dataSet.push([keySeries.ts, keySeries.value, keySeries.count]); }); } else { const tsValue = values as TsValue; - dataSet.push([tsValue.ts, tsValue.value]); + dataSet.push([tsValue.ts, tsValue.value, tsValue.count]); } subsData[keyName] = dataSet; } @@ -828,15 +932,20 @@ export class EntityDataSubscription { } private createRealtimeDataAggregator(subsTw: SubscriptionTimewindow, - tsKeyNames: Array, + tsKeys: Array, + isLatestDataAgg: boolean, + isFloatingLatestDataAgg: boolean, dataKeyType: DataKeyType, dataIndex: number, dataUpdatedCb: DataUpdatedCb): DataAggregator { return new DataAggregator( - (data, detectChanges) => { - this.onData(data, dataKeyType, dataIndex, detectChanges, false, dataUpdatedCb); + (aggType, data, detectChanges) => { + this.onData(isLatestDataAgg ? aggType : null, data, dataKeyType, dataIndex, detectChanges, + isLatestDataAgg && (this.entityDataSubscriptionOptions.type === widgetType.timeseries), dataUpdatedCb); }, - tsKeyNames, + tsKeys, + isLatestDataAgg, + isFloatingLatestDataAgg, subsTw, this.utils, this.entityDataSubscriptionOptions.ignoreDataUpdateOnIntervalTick @@ -959,7 +1068,8 @@ export class EntityDataSubscription { generatedData.data[`${dataKey.name}_${dataKey.index}`] = this.generateSeries(dataKey, startTime, endTime); } if (this.dataAggregators && this.dataAggregators.length) { - this.dataAggregators[0].onData(generatedData, true, this.history, detectChanges); + this.dataAggregators[0].onData(this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.type, + generatedData, true, this.history, detectChanges); } if (!this.history) { diff --git a/ui-ngx/src/app/shared/models/query/query.models.ts b/ui-ngx/src/app/shared/models/query/query.models.ts index 41da414227..e55beaf7e4 100644 --- a/ui-ngx/src/app/shared/models/query/query.models.ts +++ b/ui-ngx/src/app/shared/models/query/query.models.ts @@ -738,7 +738,6 @@ export function createDefaultEntityDataPageLink(pageSize: number): EntityDataPag } export const singleEntityDataPageLink: EntityDataPageLink = createDefaultEntityDataPageLink(1); -export const defaultEntityDataPageLink: EntityDataPageLink = createDefaultEntityDataPageLink(1024); export interface EntityCountQuery { entityFilter: EntityFilter; @@ -769,7 +768,7 @@ export interface EntityData { latest: {[entityKeyType: string]: {[key: string]: TsValue}}; timeseries: {[key: string]: Array}; aggLatest?: {[aggType: string]: {[key: string]: TsValue}}; - aggFloating?: {[aggType: string]: {[key: string]: Array}}; + aggFloating?: {[key: string]: Array}; } export interface AlarmData extends AlarmInfo { diff --git a/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts b/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts index e145fbdbbf..e519fd4a25 100644 --- a/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts +++ b/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts @@ -232,15 +232,6 @@ export class AlarmDataUnsubscribeCmd implements WebsocketCmd { } export class TelemetryPluginCmdsWrapper { - attrSubCmds: Array; - tsSubCmds: Array; - historyCmds: Array; - entityDataCmds: Array; - entityDataUnsubscribeCmds: Array; - alarmDataCmds: Array; - alarmDataUnsubscribeCmds: Array; - entityCountCmds: Array; - entityCountUnsubscribeCmds: Array; constructor() { this.attrSubCmds = []; @@ -253,6 +244,24 @@ export class TelemetryPluginCmdsWrapper { this.entityCountCmds = []; this.entityCountUnsubscribeCmds = []; } + attrSubCmds: Array; + tsSubCmds: Array; + historyCmds: Array; + entityDataCmds: Array; + entityDataUnsubscribeCmds: Array; + alarmDataCmds: Array; + alarmDataUnsubscribeCmds: Array; + entityCountCmds: Array; + entityCountUnsubscribeCmds: Array; + + private static popCmds(cmds: Array, leftCount: number): Array { + const toPublish = Math.min(cmds.length, leftCount); + if (toPublish > 0) { + return cmds.splice(0, toPublish); + } else { + return []; + } + } public hasCommands(): boolean { return this.tsSubCmds.length > 0 || @@ -281,38 +290,29 @@ export class TelemetryPluginCmdsWrapper { public preparePublishCommands(maxCommands: number): TelemetryPluginCmdsWrapper { const preparedWrapper = new TelemetryPluginCmdsWrapper(); let leftCount = maxCommands; - preparedWrapper.tsSubCmds = this.popCmds(this.tsSubCmds, leftCount); + preparedWrapper.tsSubCmds = TelemetryPluginCmdsWrapper.popCmds(this.tsSubCmds, leftCount); leftCount -= preparedWrapper.tsSubCmds.length; - preparedWrapper.historyCmds = this.popCmds(this.historyCmds, leftCount); + preparedWrapper.historyCmds = TelemetryPluginCmdsWrapper.popCmds(this.historyCmds, leftCount); leftCount -= preparedWrapper.historyCmds.length; - preparedWrapper.attrSubCmds = this.popCmds(this.attrSubCmds, leftCount); + preparedWrapper.attrSubCmds = TelemetryPluginCmdsWrapper.popCmds(this.attrSubCmds, leftCount); leftCount -= preparedWrapper.attrSubCmds.length; - preparedWrapper.entityDataCmds = this.popCmds(this.entityDataCmds, leftCount); + preparedWrapper.entityDataCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityDataCmds, leftCount); leftCount -= preparedWrapper.entityDataCmds.length; - preparedWrapper.entityDataUnsubscribeCmds = this.popCmds(this.entityDataUnsubscribeCmds, leftCount); + preparedWrapper.entityDataUnsubscribeCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityDataUnsubscribeCmds, leftCount); leftCount -= preparedWrapper.entityDataUnsubscribeCmds.length; - preparedWrapper.alarmDataCmds = this.popCmds(this.alarmDataCmds, leftCount); + preparedWrapper.alarmDataCmds = TelemetryPluginCmdsWrapper.popCmds(this.alarmDataCmds, leftCount); leftCount -= preparedWrapper.alarmDataCmds.length; - preparedWrapper.alarmDataUnsubscribeCmds = this.popCmds(this.alarmDataUnsubscribeCmds, leftCount); + preparedWrapper.alarmDataUnsubscribeCmds = TelemetryPluginCmdsWrapper.popCmds(this.alarmDataUnsubscribeCmds, leftCount); leftCount -= preparedWrapper.alarmDataUnsubscribeCmds.length; - preparedWrapper.entityCountCmds = this.popCmds(this.entityCountCmds, leftCount); + preparedWrapper.entityCountCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityCountCmds, leftCount); leftCount -= preparedWrapper.entityCountCmds.length; - preparedWrapper.entityCountUnsubscribeCmds = this.popCmds(this.entityCountUnsubscribeCmds, leftCount); + preparedWrapper.entityCountUnsubscribeCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityCountUnsubscribeCmds, leftCount); return preparedWrapper; } - - private popCmds(cmds: Array, leftCount: number): Array { - const toPublish = Math.min(cmds.length, leftCount); - if (toPublish > 0) { - return cmds.splice(0, toPublish); - } else { - return []; - } - } } export interface SubscriptionData { - [key: string]: [number, any][]; + [key: string]: [number, any, number?][]; } export interface SubscriptionDataHolder { @@ -454,16 +454,7 @@ export class EntityDataUpdate extends DataUpdate { super(msg); } - public prepareData(tsOffset: number) { - if (this.data) { - this.processEntityData(this.data.data, tsOffset); - } - if (this.update) { - this.processEntityData(this.update, tsOffset); - } - } - - private processEntityData(data: Array, tsOffset: number) { + private static processEntityData(data: Array, tsOffset: number) { for (const entityData of data) { if (entityData.timeseries) { for (const key of Object.keys(entityData.timeseries)) { @@ -491,28 +482,28 @@ export class EntityDataUpdate extends DataUpdate { } } } + + public prepareData(tsOffset: number) { + if (this.data) { + EntityDataUpdate.processEntityData(this.data.data, tsOffset); + } + if (this.update) { + EntityDataUpdate.processEntityData(this.update, tsOffset); + } + } } export class AlarmDataUpdate extends DataUpdate { - allowedEntities: number; - totalEntities: number; constructor(msg: AlarmDataUpdateMsg) { super(msg); this.allowedEntities = msg.allowedEntities; this.totalEntities = msg.totalEntities; } + allowedEntities: number; + totalEntities: number; - public prepareData(tsOffset: number) { - if (this.data) { - this.processAlarmData(this.data.data, tsOffset); - } - if (this.update) { - this.processAlarmData(this.update, tsOffset); - } - } - - private processAlarmData(data: Array, tsOffset: number) { + private static processAlarmData(data: Array, tsOffset: number) { for (const alarmData of data) { alarmData.createdTime += tsOffset; if (alarmData.ackTs) { @@ -544,6 +535,15 @@ export class AlarmDataUpdate extends DataUpdate { } } } + + public prepareData(tsOffset: number) { + if (this.data) { + AlarmDataUpdate.processAlarmData(this.data.data, tsOffset); + } + if (this.update) { + AlarmDataUpdate.processAlarmData(this.update, tsOffset); + } + } } export class EntityCountUpdate extends CmdUpdate {