UI: Implement TimeSeries to single latest value aggregation.
This commit is contained in:
		
							parent
							
								
									fa9f1b9f69
								
							
						
					
					
						commit
						9d6d82c5ac
					
				@ -14,7 +14,7 @@
 | 
			
		||||
/// limitations under the License.
 | 
			
		||||
///
 | 
			
		||||
 | 
			
		||||
import { SubscriptionData, SubscriptionDataHolder } from '@app/shared/models/telemetry/telemetry.models';
 | 
			
		||||
import { AggKey, SubscriptionData, SubscriptionDataHolder } from '@app/shared/models/telemetry/telemetry.models';
 | 
			
		||||
import {
 | 
			
		||||
  AggregationType,
 | 
			
		||||
  calculateIntervalComparisonEndTime,
 | 
			
		||||
@ -25,10 +25,10 @@ import {
 | 
			
		||||
  SubscriptionTimewindow
 | 
			
		||||
} from '@shared/models/time/time.models';
 | 
			
		||||
import { UtilsService } from '@core/services/utils.service';
 | 
			
		||||
import { deepClone, isNumber, isNumeric } from '@core/utils';
 | 
			
		||||
import { deepClone, isDefinedAndNotNull, isNumber, isNumeric } from '@core/utils';
 | 
			
		||||
import Timeout = NodeJS.Timeout;
 | 
			
		||||
 | 
			
		||||
export declare type onAggregatedData = (data: SubscriptionData, detectChanges: boolean) => void;
 | 
			
		||||
export declare type onAggregatedData = (aggType: AggregationType, data: SubscriptionData, detectChanges: boolean) => void;
 | 
			
		||||
 | 
			
		||||
interface AggData {
 | 
			
		||||
  count: number;
 | 
			
		||||
@ -67,12 +67,12 @@ class AggDataMap {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class AggregationMap {
 | 
			
		||||
  aggMap: {[key: string]: AggDataMap} = {};
 | 
			
		||||
  aggMap: {[aggKey: string]: AggDataMap} = {};
 | 
			
		||||
 | 
			
		||||
  detectRangeChanged(): boolean {
 | 
			
		||||
    let changed = false;
 | 
			
		||||
    for (const key of Object.keys(this.aggMap)) {
 | 
			
		||||
      const aggDataMap = this.aggMap[key];
 | 
			
		||||
    for (const aggKey of Object.keys(this.aggMap)) {
 | 
			
		||||
      const aggDataMap = this.aggMap[aggKey];
 | 
			
		||||
      if (aggDataMap.rangeChanged) {
 | 
			
		||||
        changed = true;
 | 
			
		||||
        aggDataMap.rangeChanged = false;
 | 
			
		||||
@ -82,8 +82,8 @@ class AggregationMap {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  clearRangeChangedFlags() {
 | 
			
		||||
    for (const key of Object.keys(this.aggMap)) {
 | 
			
		||||
      this.aggMap[key].rangeChanged = false;
 | 
			
		||||
    for (const aggKey of Object.keys(this.aggMap)) {
 | 
			
		||||
      this.aggMap[aggKey].rangeChanged = false;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -93,7 +93,7 @@ declare type AggFunction = (aggData: AggData, value?: any) => void;
 | 
			
		||||
const avg: AggFunction = (aggData: AggData, value?: any) => {
 | 
			
		||||
  aggData.count++;
 | 
			
		||||
  if (isNumber(value)) {
 | 
			
		||||
    aggData.sum += value;
 | 
			
		||||
    aggData.sum = aggData.aggValue * (aggData.count - 1) + value;
 | 
			
		||||
    aggData.aggValue = aggData.sum / aggData.count;
 | 
			
		||||
  } else {
 | 
			
		||||
    aggData.aggValue = value;
 | 
			
		||||
@ -135,9 +135,27 @@ const none: AggFunction = (aggData: AggData, value?: any) => {
 | 
			
		||||
 | 
			
		||||
export class DataAggregator {
 | 
			
		||||
 | 
			
		||||
  private dataBuffer: SubscriptionData = {};
 | 
			
		||||
  private data: SubscriptionData;
 | 
			
		||||
  private readonly lastPrevKvPairData: {[key: string]: [number, any]};
 | 
			
		||||
  constructor(private onDataCb: onAggregatedData,
 | 
			
		||||
              private tsKeys: AggKey[],
 | 
			
		||||
              private isLatestDataAgg: boolean,
 | 
			
		||||
              private isFloatingLatestDataAgg: boolean,
 | 
			
		||||
              private subsTw: SubscriptionTimewindow,
 | 
			
		||||
              private utils: UtilsService,
 | 
			
		||||
              private ignoreDataUpdateOnIntervalTick: boolean) {
 | 
			
		||||
    this.tsKeys.forEach((key) => {
 | 
			
		||||
      if (!this.dataBuffer[key.agg]) {
 | 
			
		||||
        this.dataBuffer[key.agg] = {};
 | 
			
		||||
      }
 | 
			
		||||
      this.dataBuffer[key.agg][key.key] = [];
 | 
			
		||||
    });
 | 
			
		||||
    if (this.subsTw.aggregation.stateData) {
 | 
			
		||||
      this.lastPrevKvPairData = {};
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private dataBuffer: {[aggType: string]: SubscriptionData} = {};
 | 
			
		||||
  private data: {[aggType: string]: SubscriptionData};
 | 
			
		||||
  private readonly lastPrevKvPairData: {[aggKey: string]: [number, any]};
 | 
			
		||||
 | 
			
		||||
  private aggregationMap: AggregationMap;
 | 
			
		||||
 | 
			
		||||
@ -145,9 +163,7 @@ export class DataAggregator {
 | 
			
		||||
  private resetPending = false;
 | 
			
		||||
  private updatedData = false;
 | 
			
		||||
 | 
			
		||||
  private noAggregation = this.subsTw.aggregation.type === AggregationType.NONE;
 | 
			
		||||
  private aggregationTimeout = Math.max(this.subsTw.aggregation.interval, 1000);
 | 
			
		||||
  private readonly aggFunction: AggFunction;
 | 
			
		||||
  private aggregationTimeout = this.isLatestDataAgg ? 1000 : Math.max(this.subsTw.aggregation.interval, 1000);
 | 
			
		||||
 | 
			
		||||
  private intervalTimeoutHandle: Timeout;
 | 
			
		||||
  private intervalScheduledTime: number;
 | 
			
		||||
@ -156,41 +172,43 @@ export class DataAggregator {
 | 
			
		||||
  private endTs: number;
 | 
			
		||||
  private elapsed: number;
 | 
			
		||||
 | 
			
		||||
  constructor(private onDataCb: onAggregatedData,
 | 
			
		||||
              private tsKeyNames: string[],
 | 
			
		||||
              private subsTw: SubscriptionTimewindow,
 | 
			
		||||
              private utils: UtilsService,
 | 
			
		||||
              private ignoreDataUpdateOnIntervalTick: boolean) {
 | 
			
		||||
    this.tsKeyNames.forEach((key) => {
 | 
			
		||||
      this.dataBuffer[key] = [];
 | 
			
		||||
    });
 | 
			
		||||
    if (this.subsTw.aggregation.stateData) {
 | 
			
		||||
      this.lastPrevKvPairData = {};
 | 
			
		||||
  private static convertValue(val: string, noAggregation: boolean): any {
 | 
			
		||||
    if (val && isNumeric(val) && (!noAggregation || noAggregation && Number(val).toString() === val)) {
 | 
			
		||||
      return Number(val);
 | 
			
		||||
    }
 | 
			
		||||
    switch (this.subsTw.aggregation.type) {
 | 
			
		||||
    return val;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private static getAggFunction(aggType: AggregationType): AggFunction {
 | 
			
		||||
    switch (aggType) {
 | 
			
		||||
      case AggregationType.MIN:
 | 
			
		||||
        this.aggFunction = min;
 | 
			
		||||
        break;
 | 
			
		||||
        return min;
 | 
			
		||||
      case AggregationType.MAX:
 | 
			
		||||
        this.aggFunction = max;
 | 
			
		||||
        break;
 | 
			
		||||
        return max;
 | 
			
		||||
      case AggregationType.AVG:
 | 
			
		||||
        this.aggFunction = avg;
 | 
			
		||||
        break;
 | 
			
		||||
        return avg;
 | 
			
		||||
      case AggregationType.SUM:
 | 
			
		||||
        this.aggFunction = sum;
 | 
			
		||||
        break;
 | 
			
		||||
        return sum;
 | 
			
		||||
      case AggregationType.COUNT:
 | 
			
		||||
        this.aggFunction = count;
 | 
			
		||||
        break;
 | 
			
		||||
        return count;
 | 
			
		||||
      case AggregationType.NONE:
 | 
			
		||||
        this.aggFunction = none;
 | 
			
		||||
        break;
 | 
			
		||||
        return none;
 | 
			
		||||
      default:
 | 
			
		||||
        this.aggFunction = avg;
 | 
			
		||||
        return avg;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private static aggKeyToString(aggKey: AggKey): string {
 | 
			
		||||
    return `${aggKey.key}_${aggKey.agg}`;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private static aggKeyFromString(aggKeyString: string): AggKey {
 | 
			
		||||
    const separatorIndex = aggKeyString.lastIndexOf('_');
 | 
			
		||||
    const key = aggKeyString.substring(0, separatorIndex);
 | 
			
		||||
    const agg = AggregationType[aggKeyString.substring(separatorIndex + 1)];
 | 
			
		||||
    return { key, agg };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public updateOnDataCb(newOnDataCb: onAggregatedData): onAggregatedData {
 | 
			
		||||
    const prevOnDataCb = this.onDataCb;
 | 
			
		||||
    this.onDataCb = newOnDataCb;
 | 
			
		||||
@ -206,7 +224,7 @@ export class DataAggregator {
 | 
			
		||||
    this.intervalScheduledTime = this.utils.currentPerfTime();
 | 
			
		||||
    this.calculateStartEndTs();
 | 
			
		||||
    this.elapsed = 0;
 | 
			
		||||
    this.aggregationTimeout = Math.max(this.subsTw.aggregation.interval, 1000);
 | 
			
		||||
    this.aggregationTimeout = this.isLatestDataAgg ? 1000 : Math.max(this.subsTw.aggregation.interval, 1000);
 | 
			
		||||
    this.resetPending = true;
 | 
			
		||||
    this.updatedData = false;
 | 
			
		||||
    this.intervalTimeoutHandle = setTimeout(this.onInterval.bind(this), this.aggregationTimeout);
 | 
			
		||||
@ -220,7 +238,8 @@ export class DataAggregator {
 | 
			
		||||
    this.aggregationMap = null;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public onData(data: SubscriptionDataHolder, update: boolean, history: boolean, detectChanges: boolean) {
 | 
			
		||||
  public onData(aggType: AggregationType,
 | 
			
		||||
                data: SubscriptionDataHolder, update: boolean, history: boolean, detectChanges: boolean) {
 | 
			
		||||
    this.updatedData = true;
 | 
			
		||||
    if (!this.dataReceived || this.resetPending) {
 | 
			
		||||
      let updateIntervalScheduledTime = true;
 | 
			
		||||
@ -235,9 +254,9 @@ export class DataAggregator {
 | 
			
		||||
      }
 | 
			
		||||
      if (update) {
 | 
			
		||||
        this.aggregationMap = new AggregationMap();
 | 
			
		||||
        this.updateAggregatedData(data.data);
 | 
			
		||||
        this.updateAggregatedData(aggType, data.data);
 | 
			
		||||
      } else {
 | 
			
		||||
        this.aggregationMap = this.processAggregatedData(data.data);
 | 
			
		||||
        this.aggregationMap = this.processAggregatedData(aggType, data.data);
 | 
			
		||||
      }
 | 
			
		||||
      if (updateIntervalScheduledTime) {
 | 
			
		||||
        this.intervalScheduledTime = this.utils.currentPerfTime();
 | 
			
		||||
@ -245,7 +264,7 @@ export class DataAggregator {
 | 
			
		||||
      this.aggregationMap.clearRangeChangedFlags();
 | 
			
		||||
      this.onInterval(history, detectChanges);
 | 
			
		||||
    } else {
 | 
			
		||||
      this.updateAggregatedData(data.data);
 | 
			
		||||
      this.updateAggregatedData(aggType, data.data);
 | 
			
		||||
      if (history) {
 | 
			
		||||
        this.intervalScheduledTime = this.utils.currentPerfTime();
 | 
			
		||||
        this.onInterval(history, detectChanges);
 | 
			
		||||
@ -283,9 +302,9 @@ export class DataAggregator {
 | 
			
		||||
    }
 | 
			
		||||
    const intervalTimeout = rangeChanged ? this.aggregationTimeout - this.elapsed : this.aggregationTimeout;
 | 
			
		||||
    if (!history) {
 | 
			
		||||
      const delta = Math.floor(this.elapsed / this.subsTw.aggregation.interval);
 | 
			
		||||
      const delta = Math.floor(this.elapsed / this.aggregationTimeout);
 | 
			
		||||
      if (delta || !this.data || rangeChanged) {
 | 
			
		||||
        const tickTs = delta * this.subsTw.aggregation.interval;
 | 
			
		||||
        const tickTs = delta * this.aggregationTimeout;
 | 
			
		||||
        if (this.subsTw.quickInterval) {
 | 
			
		||||
          const startEndTime = calculateIntervalStartEndTime(this.subsTw.quickInterval, this.subsTw.timezone);
 | 
			
		||||
          this.startTs = startEndTime[0] + this.subsTw.tsOffset;
 | 
			
		||||
@ -295,13 +314,15 @@ export class DataAggregator {
 | 
			
		||||
          this.endTs += tickTs;
 | 
			
		||||
        }
 | 
			
		||||
        this.data = this.updateData();
 | 
			
		||||
        this.elapsed = this.elapsed - delta * this.subsTw.aggregation.interval;
 | 
			
		||||
        this.elapsed = this.elapsed - delta * this.aggregationTimeout;
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      this.data = this.updateData();
 | 
			
		||||
    }
 | 
			
		||||
    if (this.onDataCb && (!this.ignoreDataUpdateOnIntervalTick || this.updatedData)) {
 | 
			
		||||
      this.onDataCb(this.data, detectChanges);
 | 
			
		||||
      for (const aggType of Object.keys(this.data)) {
 | 
			
		||||
        this.onDataCb(AggregationType[aggType], this.data[aggType], detectChanges);
 | 
			
		||||
      }
 | 
			
		||||
      this.updatedData = false;
 | 
			
		||||
    }
 | 
			
		||||
    if (!history) {
 | 
			
		||||
@ -309,39 +330,63 @@ export class DataAggregator {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private updateData(): SubscriptionData {
 | 
			
		||||
    this.tsKeyNames.forEach((key) => {
 | 
			
		||||
      this.dataBuffer[key] = [];
 | 
			
		||||
  private updateData(): {[aggType: string]: SubscriptionData} {
 | 
			
		||||
    this.dataBuffer = {};
 | 
			
		||||
    this.tsKeys.forEach((key) => {
 | 
			
		||||
      if (!this.dataBuffer[key.agg]) {
 | 
			
		||||
        this.dataBuffer[key.agg] = {};
 | 
			
		||||
      }
 | 
			
		||||
      this.dataBuffer[key.agg][key.key] = [];
 | 
			
		||||
    });
 | 
			
		||||
    for (const key of Object.keys(this.aggregationMap.aggMap)) {
 | 
			
		||||
      const aggKeyData = this.aggregationMap.aggMap[key];
 | 
			
		||||
      let keyData = this.dataBuffer[key];
 | 
			
		||||
    for (const aggKeyString of Object.keys(this.aggregationMap.aggMap)) {
 | 
			
		||||
      const aggKeyData = this.aggregationMap.aggMap[aggKeyString];
 | 
			
		||||
      const aggKey = DataAggregator.aggKeyFromString(aggKeyString);
 | 
			
		||||
      const noAggregation = aggKey.agg === AggregationType.NONE;
 | 
			
		||||
      let keyData = this.dataBuffer[aggKey.agg][aggKey.key];
 | 
			
		||||
      aggKeyData.forEach((aggData, aggTimestamp) => {
 | 
			
		||||
        if (aggTimestamp < this.startTs) {
 | 
			
		||||
          if (this.subsTw.aggregation.stateData &&
 | 
			
		||||
            (!this.lastPrevKvPairData[key] || this.lastPrevKvPairData[key][0] < aggTimestamp)) {
 | 
			
		||||
            this.lastPrevKvPairData[key] = [aggTimestamp, aggData.aggValue];
 | 
			
		||||
            (!this.lastPrevKvPairData[aggKeyString] || this.lastPrevKvPairData[aggKeyString][0] < aggTimestamp)) {
 | 
			
		||||
            this.lastPrevKvPairData[aggKeyString] = [aggTimestamp, aggData.aggValue];
 | 
			
		||||
          }
 | 
			
		||||
          aggKeyData.delete(aggTimestamp);
 | 
			
		||||
          this.updatedData = true;
 | 
			
		||||
        } else if (aggTimestamp < this.endTs || this.noAggregation) {
 | 
			
		||||
        } else if (aggTimestamp < this.endTs || noAggregation) {
 | 
			
		||||
          const kvPair: [number, any] = [aggTimestamp, aggData.aggValue];
 | 
			
		||||
          keyData.push(kvPair);
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
      keyData.sort((set1, set2) => set1[0] - set2[0]);
 | 
			
		||||
      if (this.subsTw.aggregation.stateData) {
 | 
			
		||||
        this.updateStateBounds(keyData, deepClone(this.lastPrevKvPairData[key]));
 | 
			
		||||
      if (this.isFloatingLatestDataAgg) {
 | 
			
		||||
        if (keyData.length) {
 | 
			
		||||
          const timestamp = this.startTs + (this.endTs - this.startTs) / 2;
 | 
			
		||||
          const first = keyData[0];
 | 
			
		||||
          const aggData: AggData = {
 | 
			
		||||
            aggValue: aggKey.agg === AggregationType.COUNT ? 1 : first[1],
 | 
			
		||||
            sum: first[1],
 | 
			
		||||
            count: 1
 | 
			
		||||
          };
 | 
			
		||||
          const aggFunction = DataAggregator.getAggFunction(aggKey.agg);
 | 
			
		||||
          for (let i = 1; i < keyData.length; i++) {
 | 
			
		||||
            const kvPair = keyData[i];
 | 
			
		||||
            aggFunction(aggData, kvPair[1]);
 | 
			
		||||
          }
 | 
			
		||||
          this.dataBuffer[aggKey.agg][aggKey.key] = [[timestamp, aggData.aggValue]];
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        if (this.subsTw.aggregation.stateData) {
 | 
			
		||||
          this.updateStateBounds(keyData, deepClone(this.lastPrevKvPairData[aggKeyString]));
 | 
			
		||||
        }
 | 
			
		||||
        if (keyData.length > this.subsTw.aggregation.limit) {
 | 
			
		||||
          keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit);
 | 
			
		||||
        }
 | 
			
		||||
        this.dataBuffer[aggKey.agg][aggKey.key] = keyData;
 | 
			
		||||
      }
 | 
			
		||||
      if (keyData.length > this.subsTw.aggregation.limit) {
 | 
			
		||||
        keyData = keyData.slice(keyData.length - this.subsTw.aggregation.limit);
 | 
			
		||||
      }
 | 
			
		||||
      this.dataBuffer[key] = keyData;
 | 
			
		||||
    }
 | 
			
		||||
    return this.dataBuffer;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private updateStateBounds(keyData: [number, any][], lastPrevKvPair: [number, any]) {
 | 
			
		||||
  private updateStateBounds(keyData: [number, any, number?][], lastPrevKvPair: [number, any]) {
 | 
			
		||||
    if (lastPrevKvPair) {
 | 
			
		||||
      lastPrevKvPair[0] = this.startTs;
 | 
			
		||||
    }
 | 
			
		||||
@ -369,66 +414,63 @@ export class DataAggregator {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private processAggregatedData(data: SubscriptionData): AggregationMap {
 | 
			
		||||
    const isCount = this.subsTw.aggregation.type === AggregationType.COUNT;
 | 
			
		||||
  private processAggregatedData(aggType: AggregationType, data: SubscriptionData): AggregationMap {
 | 
			
		||||
    const isCount = aggType === AggregationType.COUNT;
 | 
			
		||||
    const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg;
 | 
			
		||||
    const aggregationMap = new AggregationMap();
 | 
			
		||||
    for (const key of Object.keys(data)) {
 | 
			
		||||
      let aggKeyData = aggregationMap.aggMap[key];
 | 
			
		||||
      const aggKey = DataAggregator.aggKeyToString({key, agg: aggType});
 | 
			
		||||
      let aggKeyData = aggregationMap.aggMap[aggKey];
 | 
			
		||||
      if (!aggKeyData) {
 | 
			
		||||
        aggKeyData = new AggDataMap();
 | 
			
		||||
        aggregationMap.aggMap[key] = aggKeyData;
 | 
			
		||||
        aggregationMap.aggMap[aggKey] = aggKeyData;
 | 
			
		||||
      }
 | 
			
		||||
      const keyData = data[key];
 | 
			
		||||
      keyData.forEach((kvPair) => {
 | 
			
		||||
        const timestamp = kvPair[0];
 | 
			
		||||
        const value = this.convertValue(kvPair[1]);
 | 
			
		||||
        const aggKey = timestamp;
 | 
			
		||||
        const value = DataAggregator.convertValue(kvPair[1], noAggregation);
 | 
			
		||||
        const tsKey = timestamp;
 | 
			
		||||
        const aggData = {
 | 
			
		||||
          count: isCount ? value : 1,
 | 
			
		||||
          count: isCount ? value : isDefinedAndNotNull(kvPair[2]) ? kvPair[2] : 1,
 | 
			
		||||
          sum: value,
 | 
			
		||||
          aggValue: value
 | 
			
		||||
        };
 | 
			
		||||
        aggKeyData.set(aggKey, aggData);
 | 
			
		||||
        aggKeyData.set(tsKey, aggData);
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
    return aggregationMap;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private updateAggregatedData(data: SubscriptionData) {
 | 
			
		||||
    const isCount = this.subsTw.aggregation.type === AggregationType.COUNT;
 | 
			
		||||
  private updateAggregatedData(aggType: AggregationType, data: SubscriptionData) {
 | 
			
		||||
    const isCount = aggType === AggregationType.COUNT;
 | 
			
		||||
    const noAggregation = aggType === AggregationType.NONE || this.isFloatingLatestDataAgg;
 | 
			
		||||
    for (const key of Object.keys(data)) {
 | 
			
		||||
      let aggKeyData = this.aggregationMap.aggMap[key];
 | 
			
		||||
      const aggKey = DataAggregator.aggKeyToString({key, agg: aggType});
 | 
			
		||||
      let aggKeyData = this.aggregationMap.aggMap[aggKey];
 | 
			
		||||
      if (!aggKeyData) {
 | 
			
		||||
        aggKeyData = new AggDataMap();
 | 
			
		||||
        this.aggregationMap.aggMap[key] = aggKeyData;
 | 
			
		||||
        this.aggregationMap.aggMap[aggKey] = aggKeyData;
 | 
			
		||||
      }
 | 
			
		||||
      const keyData = data[key];
 | 
			
		||||
      keyData.forEach((kvPair) => {
 | 
			
		||||
        const timestamp = kvPair[0];
 | 
			
		||||
        const value = this.convertValue(kvPair[1]);
 | 
			
		||||
        const aggTimestamp = this.noAggregation ? timestamp : (this.startTs +
 | 
			
		||||
        const value = DataAggregator.convertValue(kvPair[1], noAggregation);
 | 
			
		||||
        const aggTimestamp = noAggregation ? timestamp : (this.startTs +
 | 
			
		||||
          Math.floor((timestamp - this.startTs) / this.subsTw.aggregation.interval) *
 | 
			
		||||
          this.subsTw.aggregation.interval + this.subsTw.aggregation.interval / 2);
 | 
			
		||||
        let aggData = aggKeyData.get(aggTimestamp);
 | 
			
		||||
        if (!aggData) {
 | 
			
		||||
        if (!aggData || this.isFloatingLatestDataAgg) {
 | 
			
		||||
          aggData = {
 | 
			
		||||
            count: 1,
 | 
			
		||||
            count: isDefinedAndNotNull(kvPair[2]) ? kvPair[2] : 1,
 | 
			
		||||
            sum: value,
 | 
			
		||||
            aggValue: isCount ? 1 : value
 | 
			
		||||
          };
 | 
			
		||||
          aggKeyData.set(aggTimestamp, aggData);
 | 
			
		||||
        } else {
 | 
			
		||||
          this.aggFunction(aggData, value);
 | 
			
		||||
          DataAggregator.getAggFunction(aggType)(aggData, value);
 | 
			
		||||
        }
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private convertValue(val: string): any {
 | 
			
		||||
    if (val && isNumeric(val) && (!this.noAggregation || this.noAggregation && Number(val).toString() === val)) {
 | 
			
		||||
      return Number(val);
 | 
			
		||||
    }
 | 
			
		||||
    return val;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -47,6 +47,7 @@ import { EntityType } from '@shared/models/entity-type.models';
 | 
			
		||||
import { Observable, of, ReplaySubject, Subject } from 'rxjs';
 | 
			
		||||
import { EntityId } from '@shared/models/id/entity-id';
 | 
			
		||||
import Timeout = NodeJS.Timeout;
 | 
			
		||||
import _ from 'lodash';
 | 
			
		||||
 | 
			
		||||
declare type DataKeyFunction = (time: number, prevValue: any) => any;
 | 
			
		||||
declare type DataKeyPostFunction = (time: number, value: any, prevValue: any, timePrev: number, prevOrigValue: any) => any;
 | 
			
		||||
@ -84,6 +85,12 @@ export interface EntityDataSubscriptionOptions {
 | 
			
		||||
 | 
			
		||||
export class EntityDataSubscription {
 | 
			
		||||
 | 
			
		||||
  constructor(private listener: EntityDataListener,
 | 
			
		||||
              private telemetryService: TelemetryService,
 | 
			
		||||
              private utils: UtilsService) {
 | 
			
		||||
    this.initializeSubscription();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private entityDataSubscriptionOptions = this.listener.subscriptionOptions;
 | 
			
		||||
  private datasourceType: DatasourceType = this.entityDataSubscriptionOptions.datasourceType;
 | 
			
		||||
  private history: boolean;
 | 
			
		||||
@ -104,6 +111,7 @@ export class EntityDataSubscription {
 | 
			
		||||
  private subsTw: SubscriptionTimewindow;
 | 
			
		||||
  private latestTsOffset: number;
 | 
			
		||||
  private dataAggregators: Array<DataAggregator>;
 | 
			
		||||
  private tsLatestDataAggregators: Array<DataAggregator>;
 | 
			
		||||
  private dataKeys: {[key: string]: Array<SubscriptionDataKey> | SubscriptionDataKey} = {};
 | 
			
		||||
  private datasourceData: {[index: number]: {[key: string]: DataSetHolder}};
 | 
			
		||||
  private datasourceOrigData: {[index: number]: {[key: string]: DataSetHolder}};
 | 
			
		||||
@ -119,10 +127,11 @@ export class EntityDataSubscription {
 | 
			
		||||
  private dataResolved = false;
 | 
			
		||||
  private started = false;
 | 
			
		||||
 | 
			
		||||
  constructor(private listener: EntityDataListener,
 | 
			
		||||
              private telemetryService: TelemetryService,
 | 
			
		||||
              private utils: UtilsService) {
 | 
			
		||||
    this.initializeSubscription();
 | 
			
		||||
  private static convertValue(val: string): any {
 | 
			
		||||
    if (val && isNumeric(val) && Number(val).toString() === val) {
 | 
			
		||||
      return Number(val);
 | 
			
		||||
    }
 | 
			
		||||
    return val;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private initializeSubscription() {
 | 
			
		||||
@ -184,6 +193,12 @@ export class EntityDataSubscription {
 | 
			
		||||
      });
 | 
			
		||||
      this.dataAggregators = null;
 | 
			
		||||
    }
 | 
			
		||||
    if (this.tsLatestDataAggregators) {
 | 
			
		||||
      this.tsLatestDataAggregators.forEach((aggregator) => {
 | 
			
		||||
        aggregator.destroy();
 | 
			
		||||
      });
 | 
			
		||||
      this.tsLatestDataAggregators = null;
 | 
			
		||||
    }
 | 
			
		||||
    this.pageData = null;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -192,12 +207,7 @@ export class EntityDataSubscription {
 | 
			
		||||
    if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) {
 | 
			
		||||
      this.started = true;
 | 
			
		||||
      this.dataResolved = true;
 | 
			
		||||
      this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow;
 | 
			
		||||
      this.latestTsOffset = this.entityDataSubscriptionOptions.latestTsOffset;
 | 
			
		||||
      this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow &&
 | 
			
		||||
        isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow);
 | 
			
		||||
      this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow &&
 | 
			
		||||
        isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs);
 | 
			
		||||
      this.prepareSubscriptionTimewindow();
 | 
			
		||||
    }
 | 
			
		||||
    if (this.datasourceType === DatasourceType.entity) {
 | 
			
		||||
      const entityFields: Array<EntityKey> =
 | 
			
		||||
@ -434,12 +444,7 @@ export class EntityDataSubscription {
 | 
			
		||||
    if (this.entityDataSubscriptionOptions.isPaginatedDataSubscription) {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow;
 | 
			
		||||
    this.latestTsOffset = this.entityDataSubscriptionOptions.latestTsOffset;
 | 
			
		||||
    this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow &&
 | 
			
		||||
      isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow);
 | 
			
		||||
    this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow &&
 | 
			
		||||
      isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs);
 | 
			
		||||
    this.prepareSubscriptionTimewindow();
 | 
			
		||||
 | 
			
		||||
    this.prepareData();
 | 
			
		||||
 | 
			
		||||
@ -473,6 +478,15 @@ export class EntityDataSubscription {
 | 
			
		||||
    this.started = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private prepareSubscriptionTimewindow() {
 | 
			
		||||
    this.subsTw = this.entityDataSubscriptionOptions.subscriptionTimewindow;
 | 
			
		||||
    this.latestTsOffset = this.entityDataSubscriptionOptions.latestTsOffset;
 | 
			
		||||
    this.history = this.entityDataSubscriptionOptions.subscriptionTimewindow &&
 | 
			
		||||
      isObject(this.entityDataSubscriptionOptions.subscriptionTimewindow.fixedWindow);
 | 
			
		||||
    this.realtime = this.entityDataSubscriptionOptions.subscriptionTimewindow &&
 | 
			
		||||
      isDefinedAndNotNull(this.entityDataSubscriptionOptions.subscriptionTimewindow.realtimeWindowMs);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private prepareSubscriptionCommands(cmd: EntityDataCmd) {
 | 
			
		||||
    if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) {
 | 
			
		||||
      if (this.tsFields.length > 0) {
 | 
			
		||||
@ -554,10 +568,16 @@ export class EntityDataSubscription {
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
    this.dataAggregators = [];
 | 
			
		||||
    if (this.tsLatestDataAggregators) {
 | 
			
		||||
      this.tsLatestDataAggregators.forEach((aggregator) => {
 | 
			
		||||
        aggregator.destroy();
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
    this.tsLatestDataAggregators = [];
 | 
			
		||||
    this.resetData();
 | 
			
		||||
 | 
			
		||||
    if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) {
 | 
			
		||||
      let tsKeyNames = [];
 | 
			
		||||
      let tsKeyNames: string[] = [];
 | 
			
		||||
      if (this.datasourceType === DatasourceType.function) {
 | 
			
		||||
        for (const key of Object.keys(this.dataKeys)) {
 | 
			
		||||
          const dataKeysList = this.dataKeys[key] as Array<SubscriptionDataKey>;
 | 
			
		||||
@ -570,18 +590,32 @@ export class EntityDataSubscription {
 | 
			
		||||
      } else {
 | 
			
		||||
        tsKeyNames = this.tsFields ? this.tsFields.map(field => field.key) : [];
 | 
			
		||||
      }
 | 
			
		||||
      for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) {
 | 
			
		||||
        if (tsKeyNames.length) {
 | 
			
		||||
      const aggKeys: AggKey[] = tsKeyNames.map(key => ({key, agg: this.subsTw.aggregation.type}));
 | 
			
		||||
      if (aggKeys.length) {
 | 
			
		||||
        for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) {
 | 
			
		||||
          if (this.datasourceType === DatasourceType.function) {
 | 
			
		||||
            this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames,
 | 
			
		||||
              DataKeyType.function, dataIndex, this.notifyListener.bind(this));
 | 
			
		||||
            this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, aggKeys,
 | 
			
		||||
              false, false, DataKeyType.function, dataIndex, this.notifyListener.bind(this));
 | 
			
		||||
          } else {
 | 
			
		||||
            this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, tsKeyNames,
 | 
			
		||||
              DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this));
 | 
			
		||||
            this.dataAggregators[dataIndex] = this.createRealtimeDataAggregator(this.subsTw, aggKeys,
 | 
			
		||||
              false, false, DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (this.aggTsValues && this.aggTsValues.length) {
 | 
			
		||||
      const aggLatestTimewindow = deepClone(this.subsTw);
 | 
			
		||||
      aggLatestTimewindow.aggregation.stateData = false;
 | 
			
		||||
      const isFloatingLatestDataAgg = !aggLatestTimewindow.quickInterval;
 | 
			
		||||
      if (!isFloatingLatestDataAgg) {
 | 
			
		||||
        aggLatestTimewindow.aggregation.interval = aggLatestTimewindow.aggregation.timeWindow;
 | 
			
		||||
      }
 | 
			
		||||
      for (let dataIndex = 0; dataIndex < this.pageData.data.length; dataIndex++) {
 | 
			
		||||
        this.tsLatestDataAggregators[dataIndex] = this.createRealtimeDataAggregator(aggLatestTimewindow, this.aggTsValues,
 | 
			
		||||
          true, isFloatingLatestDataAgg,
 | 
			
		||||
          DataKeyType.timeseries, dataIndex, this.notifyListener.bind(this));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private resetData() {
 | 
			
		||||
@ -703,13 +737,87 @@ export class EntityDataSubscription {
 | 
			
		||||
 | 
			
		||||
  private processEntityData(entityData: EntityData, dataIndex: number, isUpdate: boolean,
 | 
			
		||||
                            dataUpdatedCb: DataUpdatedCb) {
 | 
			
		||||
    if ((this.entityDataSubscriptionOptions.type === widgetType.latest ||
 | 
			
		||||
        this.entityDataSubscriptionOptions.type === widgetType.timeseries) && entityData.latest) {
 | 
			
		||||
      for (const type of Object.keys(entityData.latest)) {
 | 
			
		||||
        const subscriptionData = this.toSubscriptionData(entityData.latest[type], false);
 | 
			
		||||
        const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]);
 | 
			
		||||
        this.onData(subscriptionData, dataKeyType, dataIndex, true,
 | 
			
		||||
          this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
 | 
			
		||||
    if (this.entityDataSubscriptionOptions.type === widgetType.latest ||
 | 
			
		||||
        this.entityDataSubscriptionOptions.type === widgetType.timeseries) {
 | 
			
		||||
      if (entityData.latest) {
 | 
			
		||||
        for (const type of Object.keys(entityData.latest)) {
 | 
			
		||||
          const subscriptionData = this.toSubscriptionData(entityData.latest[type], false);
 | 
			
		||||
          const dataKeyType = entityKeyTypeToDataKeyType(EntityKeyType[type]);
 | 
			
		||||
          if (isUpdate && EntityKeyType[type] === EntityKeyType.TIME_SERIES) {
 | 
			
		||||
            const keys: string[] = Object.keys(subscriptionData);
 | 
			
		||||
            const latestTsKeys = this.latestValues.filter(key => key.type === EntityKeyType.TIME_SERIES && keys.includes(key.key));
 | 
			
		||||
            if (latestTsKeys.length) {
 | 
			
		||||
              const latestTsSubsciptionData: SubscriptionData = {};
 | 
			
		||||
              for (const latestTsKey of latestTsKeys) {
 | 
			
		||||
                latestTsSubsciptionData[latestTsKey.key] = subscriptionData[latestTsKey.key];
 | 
			
		||||
              }
 | 
			
		||||
              this.onData(null, latestTsSubsciptionData, dataKeyType, dataIndex, true,
 | 
			
		||||
                this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
 | 
			
		||||
            }
 | 
			
		||||
            const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key));
 | 
			
		||||
            if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) {
 | 
			
		||||
              const dataAggregator = this.tsLatestDataAggregators[dataIndex];
 | 
			
		||||
              const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg);
 | 
			
		||||
              for (const aggTypeString of Object.keys(tsKeysByAggType)) {
 | 
			
		||||
                const tsKeys = tsKeysByAggType[aggTypeString];
 | 
			
		||||
                const latestTsAggSubsciptionData: SubscriptionData = {};
 | 
			
		||||
                for (const tsKey of tsKeys) {
 | 
			
		||||
                  latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key];
 | 
			
		||||
                }
 | 
			
		||||
                dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, true, false, true);
 | 
			
		||||
              }
 | 
			
		||||
            }
 | 
			
		||||
          } else {
 | 
			
		||||
            this.onData(null, subscriptionData, dataKeyType, dataIndex, true,
 | 
			
		||||
              this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      if (entityData.aggLatest) {
 | 
			
		||||
        for (const aggTypeString of Object.keys(entityData.aggLatest)) {
 | 
			
		||||
          const subscriptionData = this.toSubscriptionData(entityData.aggLatest[aggTypeString], false);
 | 
			
		||||
          if (this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) {
 | 
			
		||||
            const dataAggregator = this.tsLatestDataAggregators[dataIndex];
 | 
			
		||||
            let prevDataCb;
 | 
			
		||||
            if (!isUpdate) {
 | 
			
		||||
              prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => {
 | 
			
		||||
                this.onData(aggType, data, DataKeyType.timeseries, dataIndex, detectChanges,
 | 
			
		||||
                  this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
 | 
			
		||||
              });
 | 
			
		||||
            }
 | 
			
		||||
            dataAggregator.onData(AggregationType[aggTypeString], {data: subscriptionData}, false, this.history, true);
 | 
			
		||||
            if (prevDataCb) {
 | 
			
		||||
              dataAggregator.updateOnDataCb(prevDataCb);
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      if (entityData.aggFloating) {
 | 
			
		||||
        const subscriptionData = this.toSubscriptionData(entityData.aggFloating, true);
 | 
			
		||||
        const keys: string[] = Object.keys(subscriptionData);
 | 
			
		||||
        const aggTsKeys = this.aggTsValues.filter(key => keys.includes(key.key));
 | 
			
		||||
        if (aggTsKeys.length && this.tsLatestDataAggregators && this.tsLatestDataAggregators[dataIndex]) {
 | 
			
		||||
          const dataAggregator = this.tsLatestDataAggregators[dataIndex];
 | 
			
		||||
          let prevDataCb;
 | 
			
		||||
          if (!isUpdate) {
 | 
			
		||||
            prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => {
 | 
			
		||||
              this.onData(aggType, data, DataKeyType.timeseries, dataIndex, detectChanges,
 | 
			
		||||
                this.entityDataSubscriptionOptions.type === widgetType.timeseries, dataUpdatedCb);
 | 
			
		||||
            });
 | 
			
		||||
          }
 | 
			
		||||
          const tsKeysByAggType = _.groupBy(aggTsKeys, value => value.agg);
 | 
			
		||||
          for (const aggTypeString of Object.keys(tsKeysByAggType)) {
 | 
			
		||||
            const tsKeys = tsKeysByAggType[aggTypeString];
 | 
			
		||||
            const latestTsAggSubsciptionData: SubscriptionData = {};
 | 
			
		||||
            for (const tsKey of tsKeys) {
 | 
			
		||||
              latestTsAggSubsciptionData[tsKey.key] = subscriptionData[tsKey.key];
 | 
			
		||||
            }
 | 
			
		||||
            dataAggregator.onData(AggregationType[aggTypeString], {data: latestTsAggSubsciptionData}, false, this.history, true);
 | 
			
		||||
          }
 | 
			
		||||
          if (prevDataCb) {
 | 
			
		||||
            dataAggregator.updateOnDataCb(prevDataCb);
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) {
 | 
			
		||||
@ -718,26 +826,29 @@ export class EntityDataSubscription {
 | 
			
		||||
        const dataAggregator = this.dataAggregators[dataIndex];
 | 
			
		||||
        let prevDataCb;
 | 
			
		||||
        if (!isUpdate) {
 | 
			
		||||
          prevDataCb = dataAggregator.updateOnDataCb((data, detectChanges) => {
 | 
			
		||||
            this.onData(data, this.datasourceType === DatasourceType.function ?
 | 
			
		||||
          prevDataCb = dataAggregator.updateOnDataCb((aggType, data, detectChanges) => {
 | 
			
		||||
            this.onData(null, data, this.datasourceType === DatasourceType.function ?
 | 
			
		||||
              DataKeyType.function : DataKeyType.timeseries, dataIndex, detectChanges, false, dataUpdatedCb);
 | 
			
		||||
          });
 | 
			
		||||
        }
 | 
			
		||||
        dataAggregator.onData({data: subscriptionData}, false, this.history, true);
 | 
			
		||||
        dataAggregator.onData(this.subsTw.aggregation.type, {data: subscriptionData}, false, this.history, true);
 | 
			
		||||
        if (prevDataCb) {
 | 
			
		||||
          dataAggregator.updateOnDataCb(prevDataCb);
 | 
			
		||||
        }
 | 
			
		||||
      } else if (!this.history && !isUpdate) {
 | 
			
		||||
        this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, false, dataUpdatedCb);
 | 
			
		||||
        this.onData(null,
 | 
			
		||||
          subscriptionData, DataKeyType.timeseries, dataIndex, true, false, dataUpdatedCb);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private onData(sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean,
 | 
			
		||||
  private onData(aggType: AggregationType | undefined | null,
 | 
			
		||||
                 sourceData: SubscriptionData, type: DataKeyType, dataIndex: number, detectChanges: boolean,
 | 
			
		||||
                 isTsLatest: boolean, dataUpdatedCb: DataUpdatedCb) {
 | 
			
		||||
    for (const keyName of Object.keys(sourceData)) {
 | 
			
		||||
      const keyData = sourceData[keyName];
 | 
			
		||||
      const key = `${keyName}_${type}${isTsLatest ? '_latest' : ''}`;
 | 
			
		||||
      const aggSuffix = aggType && aggType !== AggregationType.NONE ? `_${aggType.toLowerCase()}` : '';
 | 
			
		||||
      const key = `${keyName}_${type}${aggSuffix}${isTsLatest ? '_latest' : ''}`;
 | 
			
		||||
      const dataKeyList = this.dataKeys[key] as Array<SubscriptionDataKey>;
 | 
			
		||||
      for (let keyIndex = 0; dataKeyList && keyIndex < dataKeyList.length; keyIndex++) {
 | 
			
		||||
        const datasourceKey = `${key}_${keyIndex}`;
 | 
			
		||||
@ -768,28 +879,28 @@ export class EntityDataSubscription {
 | 
			
		||||
            keyData.forEach((keySeries) => {
 | 
			
		||||
              let series = keySeries;
 | 
			
		||||
              const time = series[0];
 | 
			
		||||
              this.datasourceOrigData[dataIndex][datasourceKey].data.push(series);
 | 
			
		||||
              let value = this.convertValue(series[1]);
 | 
			
		||||
              this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]);
 | 
			
		||||
              let value = EntityDataSubscription.convertValue(series[1]);
 | 
			
		||||
              if (dataKey.postFunc) {
 | 
			
		||||
                value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]);
 | 
			
		||||
              }
 | 
			
		||||
              prevOrigSeries = series;
 | 
			
		||||
              series = [time, value];
 | 
			
		||||
              data.push(series);
 | 
			
		||||
              prevSeries = series;
 | 
			
		||||
              prevOrigSeries = [series[0], series[1]];
 | 
			
		||||
              series = [series[0], value];
 | 
			
		||||
              data.push([series[0], series[1]]);
 | 
			
		||||
              prevSeries =  [series[0], series[1]];
 | 
			
		||||
            });
 | 
			
		||||
            update = true;
 | 
			
		||||
          } else if (this.entityDataSubscriptionOptions.type === widgetType.latest || isTsLatest) {
 | 
			
		||||
            if (keyData.length > 0) {
 | 
			
		||||
              let series = keyData[0];
 | 
			
		||||
              const time = series[0];
 | 
			
		||||
              this.datasourceOrigData[dataIndex][datasourceKey].data.push(series);
 | 
			
		||||
              let value = this.convertValue(series[1]);
 | 
			
		||||
              this.datasourceOrigData[dataIndex][datasourceKey].data.push([series[0], series[1]]);
 | 
			
		||||
              let value = EntityDataSubscription.convertValue(series[1]);
 | 
			
		||||
              if (dataKey.postFunc) {
 | 
			
		||||
                value = dataKey.postFunc(time, value, prevSeries[1], prevOrigSeries[0], prevOrigSeries[1]);
 | 
			
		||||
              }
 | 
			
		||||
              series = [time, value];
 | 
			
		||||
              data.push(series);
 | 
			
		||||
              data.push([series[0], series[1]]);
 | 
			
		||||
            }
 | 
			
		||||
            update = true;
 | 
			
		||||
          }
 | 
			
		||||
@ -802,25 +913,18 @@ export class EntityDataSubscription {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private convertValue(val: string): any {
 | 
			
		||||
    if (val && isNumeric(val) && Number(val).toString() === val) {
 | 
			
		||||
      return Number(val);
 | 
			
		||||
    }
 | 
			
		||||
    return val;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private toSubscriptionData(sourceData: {[key: string]: TsValue | TsValue[]}, isTs: boolean): SubscriptionData {
 | 
			
		||||
    const subsData: SubscriptionData = {};
 | 
			
		||||
    for (const keyName of Object.keys(sourceData)) {
 | 
			
		||||
      const values = sourceData[keyName];
 | 
			
		||||
      const dataSet: [number, any][] = [];
 | 
			
		||||
      const dataSet: [number, any, number?][] = [];
 | 
			
		||||
      if (isTs) {
 | 
			
		||||
        (values as TsValue[]).forEach((keySeries) => {
 | 
			
		||||
          dataSet.push([keySeries.ts, keySeries.value]);
 | 
			
		||||
          dataSet.push([keySeries.ts, keySeries.value, keySeries.count]);
 | 
			
		||||
        });
 | 
			
		||||
      } else {
 | 
			
		||||
        const tsValue = values as TsValue;
 | 
			
		||||
        dataSet.push([tsValue.ts, tsValue.value]);
 | 
			
		||||
        dataSet.push([tsValue.ts, tsValue.value, tsValue.count]);
 | 
			
		||||
      }
 | 
			
		||||
      subsData[keyName] = dataSet;
 | 
			
		||||
    }
 | 
			
		||||
@ -828,15 +932,20 @@ export class EntityDataSubscription {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private createRealtimeDataAggregator(subsTw: SubscriptionTimewindow,
 | 
			
		||||
                                       tsKeyNames: Array<string>,
 | 
			
		||||
                                       tsKeys: Array<AggKey>,
 | 
			
		||||
                                       isLatestDataAgg: boolean,
 | 
			
		||||
                                       isFloatingLatestDataAgg: boolean,
 | 
			
		||||
                                       dataKeyType: DataKeyType,
 | 
			
		||||
                                       dataIndex: number,
 | 
			
		||||
                                       dataUpdatedCb: DataUpdatedCb): DataAggregator {
 | 
			
		||||
    return new DataAggregator(
 | 
			
		||||
      (data, detectChanges) => {
 | 
			
		||||
        this.onData(data, dataKeyType, dataIndex, detectChanges, false, dataUpdatedCb);
 | 
			
		||||
      (aggType, data, detectChanges) => {
 | 
			
		||||
        this.onData(isLatestDataAgg ? aggType : null, data, dataKeyType, dataIndex, detectChanges,
 | 
			
		||||
          isLatestDataAgg && (this.entityDataSubscriptionOptions.type === widgetType.timeseries), dataUpdatedCb);
 | 
			
		||||
      },
 | 
			
		||||
      tsKeyNames,
 | 
			
		||||
      tsKeys,
 | 
			
		||||
      isLatestDataAgg,
 | 
			
		||||
      isFloatingLatestDataAgg,
 | 
			
		||||
      subsTw,
 | 
			
		||||
      this.utils,
 | 
			
		||||
      this.entityDataSubscriptionOptions.ignoreDataUpdateOnIntervalTick
 | 
			
		||||
@ -959,7 +1068,8 @@ export class EntityDataSubscription {
 | 
			
		||||
      generatedData.data[`${dataKey.name}_${dataKey.index}`] = this.generateSeries(dataKey, startTime, endTime);
 | 
			
		||||
    }
 | 
			
		||||
    if (this.dataAggregators && this.dataAggregators.length) {
 | 
			
		||||
      this.dataAggregators[0].onData(generatedData, true, this.history, detectChanges);
 | 
			
		||||
      this.dataAggregators[0].onData(this.entityDataSubscriptionOptions.subscriptionTimewindow.aggregation.type,
 | 
			
		||||
         generatedData, true, this.history, detectChanges);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!this.history) {
 | 
			
		||||
 | 
			
		||||
@ -738,7 +738,6 @@ export function createDefaultEntityDataPageLink(pageSize: number): EntityDataPag
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export const singleEntityDataPageLink: EntityDataPageLink = createDefaultEntityDataPageLink(1);
 | 
			
		||||
export const defaultEntityDataPageLink: EntityDataPageLink = createDefaultEntityDataPageLink(1024);
 | 
			
		||||
 | 
			
		||||
export interface EntityCountQuery {
 | 
			
		||||
  entityFilter: EntityFilter;
 | 
			
		||||
@ -769,7 +768,7 @@ export interface EntityData {
 | 
			
		||||
  latest: {[entityKeyType: string]: {[key: string]: TsValue}};
 | 
			
		||||
  timeseries: {[key: string]: Array<TsValue>};
 | 
			
		||||
  aggLatest?: {[aggType: string]: {[key: string]: TsValue}};
 | 
			
		||||
  aggFloating?: {[aggType: string]: {[key: string]: Array<TsValue>}};
 | 
			
		||||
  aggFloating?: {[key: string]: Array<TsValue>};
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface AlarmData extends AlarmInfo {
 | 
			
		||||
 | 
			
		||||
@ -232,15 +232,6 @@ export class AlarmDataUnsubscribeCmd implements WebsocketCmd {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export class TelemetryPluginCmdsWrapper {
 | 
			
		||||
  attrSubCmds: Array<AttributesSubscriptionCmd>;
 | 
			
		||||
  tsSubCmds: Array<TimeseriesSubscriptionCmd>;
 | 
			
		||||
  historyCmds: Array<GetHistoryCmd>;
 | 
			
		||||
  entityDataCmds: Array<EntityDataCmd>;
 | 
			
		||||
  entityDataUnsubscribeCmds: Array<EntityDataUnsubscribeCmd>;
 | 
			
		||||
  alarmDataCmds: Array<AlarmDataCmd>;
 | 
			
		||||
  alarmDataUnsubscribeCmds: Array<AlarmDataUnsubscribeCmd>;
 | 
			
		||||
  entityCountCmds: Array<EntityCountCmd>;
 | 
			
		||||
  entityCountUnsubscribeCmds: Array<EntityCountUnsubscribeCmd>;
 | 
			
		||||
 | 
			
		||||
  constructor() {
 | 
			
		||||
    this.attrSubCmds = [];
 | 
			
		||||
@ -253,6 +244,24 @@ export class TelemetryPluginCmdsWrapper {
 | 
			
		||||
    this.entityCountCmds = [];
 | 
			
		||||
    this.entityCountUnsubscribeCmds = [];
 | 
			
		||||
  }
 | 
			
		||||
  attrSubCmds: Array<AttributesSubscriptionCmd>;
 | 
			
		||||
  tsSubCmds: Array<TimeseriesSubscriptionCmd>;
 | 
			
		||||
  historyCmds: Array<GetHistoryCmd>;
 | 
			
		||||
  entityDataCmds: Array<EntityDataCmd>;
 | 
			
		||||
  entityDataUnsubscribeCmds: Array<EntityDataUnsubscribeCmd>;
 | 
			
		||||
  alarmDataCmds: Array<AlarmDataCmd>;
 | 
			
		||||
  alarmDataUnsubscribeCmds: Array<AlarmDataUnsubscribeCmd>;
 | 
			
		||||
  entityCountCmds: Array<EntityCountCmd>;
 | 
			
		||||
  entityCountUnsubscribeCmds: Array<EntityCountUnsubscribeCmd>;
 | 
			
		||||
 | 
			
		||||
  private static popCmds<T>(cmds: Array<T>, leftCount: number): Array<T> {
 | 
			
		||||
    const toPublish = Math.min(cmds.length, leftCount);
 | 
			
		||||
    if (toPublish > 0) {
 | 
			
		||||
      return cmds.splice(0, toPublish);
 | 
			
		||||
    } else {
 | 
			
		||||
      return [];
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public hasCommands(): boolean {
 | 
			
		||||
    return this.tsSubCmds.length > 0 ||
 | 
			
		||||
@ -281,38 +290,29 @@ export class TelemetryPluginCmdsWrapper {
 | 
			
		||||
  public preparePublishCommands(maxCommands: number): TelemetryPluginCmdsWrapper {
 | 
			
		||||
    const preparedWrapper = new TelemetryPluginCmdsWrapper();
 | 
			
		||||
    let leftCount = maxCommands;
 | 
			
		||||
    preparedWrapper.tsSubCmds = this.popCmds(this.tsSubCmds, leftCount);
 | 
			
		||||
    preparedWrapper.tsSubCmds = TelemetryPluginCmdsWrapper.popCmds(this.tsSubCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.tsSubCmds.length;
 | 
			
		||||
    preparedWrapper.historyCmds = this.popCmds(this.historyCmds, leftCount);
 | 
			
		||||
    preparedWrapper.historyCmds = TelemetryPluginCmdsWrapper.popCmds(this.historyCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.historyCmds.length;
 | 
			
		||||
    preparedWrapper.attrSubCmds = this.popCmds(this.attrSubCmds, leftCount);
 | 
			
		||||
    preparedWrapper.attrSubCmds = TelemetryPluginCmdsWrapper.popCmds(this.attrSubCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.attrSubCmds.length;
 | 
			
		||||
    preparedWrapper.entityDataCmds = this.popCmds(this.entityDataCmds, leftCount);
 | 
			
		||||
    preparedWrapper.entityDataCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityDataCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.entityDataCmds.length;
 | 
			
		||||
    preparedWrapper.entityDataUnsubscribeCmds = this.popCmds(this.entityDataUnsubscribeCmds, leftCount);
 | 
			
		||||
    preparedWrapper.entityDataUnsubscribeCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityDataUnsubscribeCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.entityDataUnsubscribeCmds.length;
 | 
			
		||||
    preparedWrapper.alarmDataCmds = this.popCmds(this.alarmDataCmds, leftCount);
 | 
			
		||||
    preparedWrapper.alarmDataCmds = TelemetryPluginCmdsWrapper.popCmds(this.alarmDataCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.alarmDataCmds.length;
 | 
			
		||||
    preparedWrapper.alarmDataUnsubscribeCmds = this.popCmds(this.alarmDataUnsubscribeCmds, leftCount);
 | 
			
		||||
    preparedWrapper.alarmDataUnsubscribeCmds = TelemetryPluginCmdsWrapper.popCmds(this.alarmDataUnsubscribeCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.alarmDataUnsubscribeCmds.length;
 | 
			
		||||
    preparedWrapper.entityCountCmds = this.popCmds(this.entityCountCmds, leftCount);
 | 
			
		||||
    preparedWrapper.entityCountCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityCountCmds, leftCount);
 | 
			
		||||
    leftCount -= preparedWrapper.entityCountCmds.length;
 | 
			
		||||
    preparedWrapper.entityCountUnsubscribeCmds = this.popCmds(this.entityCountUnsubscribeCmds, leftCount);
 | 
			
		||||
    preparedWrapper.entityCountUnsubscribeCmds = TelemetryPluginCmdsWrapper.popCmds(this.entityCountUnsubscribeCmds, leftCount);
 | 
			
		||||
    return preparedWrapper;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private popCmds<T>(cmds: Array<T>, leftCount: number): Array<T> {
 | 
			
		||||
    const toPublish = Math.min(cmds.length, leftCount);
 | 
			
		||||
    if (toPublish > 0) {
 | 
			
		||||
      return cmds.splice(0, toPublish);
 | 
			
		||||
    } else {
 | 
			
		||||
      return [];
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface SubscriptionData {
 | 
			
		||||
  [key: string]: [number, any][];
 | 
			
		||||
  [key: string]: [number, any, number?][];
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface SubscriptionDataHolder {
 | 
			
		||||
@ -454,16 +454,7 @@ export class EntityDataUpdate extends DataUpdate<EntityData> {
 | 
			
		||||
    super(msg);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public prepareData(tsOffset: number) {
 | 
			
		||||
    if (this.data) {
 | 
			
		||||
      this.processEntityData(this.data.data, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
    if (this.update) {
 | 
			
		||||
      this.processEntityData(this.update, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private processEntityData(data: Array<EntityData>, tsOffset: number) {
 | 
			
		||||
  private static processEntityData(data: Array<EntityData>, tsOffset: number) {
 | 
			
		||||
    for (const entityData of data) {
 | 
			
		||||
      if (entityData.timeseries) {
 | 
			
		||||
        for (const key of Object.keys(entityData.timeseries)) {
 | 
			
		||||
@ -491,28 +482,28 @@ export class EntityDataUpdate extends DataUpdate<EntityData> {
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public prepareData(tsOffset: number) {
 | 
			
		||||
    if (this.data) {
 | 
			
		||||
      EntityDataUpdate.processEntityData(this.data.data, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
    if (this.update) {
 | 
			
		||||
      EntityDataUpdate.processEntityData(this.update, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export class AlarmDataUpdate extends DataUpdate<AlarmData> {
 | 
			
		||||
  allowedEntities: number;
 | 
			
		||||
  totalEntities: number;
 | 
			
		||||
 | 
			
		||||
  constructor(msg: AlarmDataUpdateMsg) {
 | 
			
		||||
    super(msg);
 | 
			
		||||
    this.allowedEntities = msg.allowedEntities;
 | 
			
		||||
    this.totalEntities = msg.totalEntities;
 | 
			
		||||
  }
 | 
			
		||||
  allowedEntities: number;
 | 
			
		||||
  totalEntities: number;
 | 
			
		||||
 | 
			
		||||
  public prepareData(tsOffset: number) {
 | 
			
		||||
    if (this.data) {
 | 
			
		||||
      this.processAlarmData(this.data.data, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
    if (this.update) {
 | 
			
		||||
      this.processAlarmData(this.update, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private processAlarmData(data: Array<AlarmData>, tsOffset: number) {
 | 
			
		||||
  private static processAlarmData(data: Array<AlarmData>, tsOffset: number) {
 | 
			
		||||
    for (const alarmData of data) {
 | 
			
		||||
      alarmData.createdTime += tsOffset;
 | 
			
		||||
      if (alarmData.ackTs) {
 | 
			
		||||
@ -544,6 +535,15 @@ export class AlarmDataUpdate extends DataUpdate<AlarmData> {
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public prepareData(tsOffset: number) {
 | 
			
		||||
    if (this.data) {
 | 
			
		||||
      AlarmDataUpdate.processAlarmData(this.data.data, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
    if (this.update) {
 | 
			
		||||
      AlarmDataUpdate.processAlarmData(this.update, tsOffset);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export class EntityCountUpdate extends CmdUpdate {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user