/* * Copyright © 2016-2017 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ export default class DataAggregator { constructor(onDataCb, tsKeyNames, startTs, limit, aggregationType, timeWindow, interval, types, $timeout, $filter) { this.onDataCb = onDataCb; this.tsKeyNames = tsKeyNames; this.dataBuffer = {}; for (var k = 0; k < tsKeyNames.length; k++) { this.dataBuffer[tsKeyNames[k]] = []; } this.startTs = startTs; this.aggregationType = aggregationType; this.types = types; this.$timeout = $timeout; this.$filter = $filter; this.dataReceived = false; this.resetPending = false; this.noAggregation = aggregationType === types.aggregation.none.value; this.limit = limit; this.timeWindow = timeWindow; this.interval = interval; this.aggregationTimeout = Math.max(this.interval, 1000); switch (aggregationType) { case types.aggregation.min.value: this.aggFunction = min; break; case types.aggregation.max.value: this.aggFunction = max; break; case types.aggregation.avg.value: this.aggFunction = avg; break; case types.aggregation.sum.value: this.aggFunction = sum; break; case types.aggregation.count.value: this.aggFunction = count; break; case types.aggregation.none.value: this.aggFunction = none; break; default: this.aggFunction = avg; } } reset(startTs, timeWindow, interval) { if (this.intervalTimeoutHandle) { this.$timeout.cancel(this.intervalTimeoutHandle); this.intervalTimeoutHandle = null; } this.intervalScheduledTime = currentTime(); this.startTs = startTs; this.timeWindow = timeWindow; this.interval = interval; this.endTs = this.startTs + this.timeWindow; this.elapsed = 0; this.aggregationTimeout = Math.max(this.interval, 1000); this.resetPending = true; var self = this; this.intervalTimeoutHandle = this.$timeout(function() { self.onInterval(); }, this.aggregationTimeout, false); } onData(data, update, history, apply) { if (!this.dataReceived || this.resetPending) { var updateIntervalScheduledTime = true; if (!this.dataReceived) { this.elapsed = 0; this.dataReceived = true; this.endTs = this.startTs + this.timeWindow; } if (this.resetPending) { this.resetPending = false; updateIntervalScheduledTime = false; } if (update) { this.aggregationMap = {}; updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value, this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs); } else { this.aggregationMap = processAggregatedData(data.data, this.aggregationType === this.types.aggregation.count.value, this.noAggregation); } if (updateIntervalScheduledTime) { this.intervalScheduledTime = currentTime(); } this.onInterval(history, apply); } else { updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value, this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs); if (history) { this.intervalScheduledTime = currentTime(); this.onInterval(history, apply); } } } onInterval(history, apply) { var now = currentTime(); this.elapsed += now - this.intervalScheduledTime; this.intervalScheduledTime = now; if (this.intervalTimeoutHandle) { this.$timeout.cancel(this.intervalTimeoutHandle); this.intervalTimeoutHandle = null; } if (!history) { var delta = Math.floor(this.elapsed / this.interval); if (delta || !this.data) { this.startTs += delta * this.interval; this.endTs += delta * this.interval; this.data = this.updateData(); this.elapsed = this.elapsed - delta * this.interval; } } else { this.data = this.updateData(); } if (this.onDataCb) { this.onDataCb(this.data, apply); } var self = this; if (!history) { this.intervalTimeoutHandle = this.$timeout(function() { self.onInterval(); }, this.aggregationTimeout, false); } } updateData() { for (var k = 0; k < this.tsKeyNames.length; k++) { this.dataBuffer[this.tsKeyNames[k]] = []; } for (var key in this.aggregationMap) { var aggKeyData = this.aggregationMap[key]; var keyData = this.dataBuffer[key]; for (var aggTimestamp in aggKeyData) { if (aggTimestamp <= this.startTs) { delete aggKeyData[aggTimestamp]; } else if (aggTimestamp <= this.endTs) { var aggData = aggKeyData[aggTimestamp]; var kvPair = [Number(aggTimestamp), aggData.aggValue]; keyData.push(kvPair); } } keyData = this.$filter('orderBy')(keyData, '+this[0]'); if (keyData.length > this.limit) { keyData = keyData.slice(keyData.length - this.limit); } this.dataBuffer[key] = keyData; } return this.dataBuffer; } destroy() { if (this.intervalTimeoutHandle) { this.$timeout.cancel(this.intervalTimeoutHandle); this.intervalTimeoutHandle = null; } this.aggregationMap = null; } } /* eslint-disable */ function currentTime() { return window.performance && window.performance.now ? window.performance.now() : Date.now(); } /* eslint-enable */ function processAggregatedData(data, isCount, noAggregation) { var aggregationMap = {}; for (var key in data) { var aggKeyData = aggregationMap[key]; if (!aggKeyData) { aggKeyData = {}; aggregationMap[key] = aggKeyData; } var keyData = data[key]; for (var i = 0; i < keyData.length; i++) { var kvPair = keyData[i]; var timestamp = kvPair[0]; var value = convertValue(kvPair[1], noAggregation); var aggKey = timestamp; var aggData = { count: isCount ? value : 1, sum: value, aggValue: value } aggKeyData[aggKey] = aggData; } } return aggregationMap; } function updateAggregatedData(aggregationMap, isCount, noAggregation, aggFunction, data, interval, startTs) { for (var key in data) { var aggKeyData = aggregationMap[key]; if (!aggKeyData) { aggKeyData = {}; aggregationMap[key] = aggKeyData; } var keyData = data[key]; for (var i = 0; i < keyData.length; i++) { var kvPair = keyData[i]; var timestamp = kvPair[0]; var value = convertValue(kvPair[1], noAggregation); var aggTimestamp = noAggregation ? timestamp : (startTs + Math.floor((timestamp - startTs) / interval) * interval + interval/2); var aggData = aggKeyData[aggTimestamp]; if (!aggData) { aggData = { count: 1, sum: value, aggValue: isCount ? 1 : value } aggKeyData[aggTimestamp] = aggData; } else { aggFunction(aggData, value); } } } } function convertValue(value, noAggregation) { if (!noAggregation || value && isNumeric(value)) { return Number(value); } else { return value; } } function isNumeric(value) { return (value - parseFloat( value ) + 1) >= 0; } function avg(aggData, value) { aggData.count++; aggData.sum += value; aggData.aggValue = aggData.sum / aggData.count; } function min(aggData, value) { aggData.aggValue = Math.min(aggData.aggValue, value); } function max(aggData, value) { aggData.aggValue = Math.max(aggData.aggValue, value); } function sum(aggData, value) { aggData.aggValue = aggData.aggValue + value; } function count(aggData) { aggData.count++; aggData.aggValue = aggData.count; } function none(aggData, value) { aggData.aggValue = value; }