2017-02-24 13:04:52 +02:00
|
|
|
/*
|
|
|
|
|
* Copyright © 2016-2017 The Thingsboard Authors
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
|
*
|
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
*
|
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
|
* limitations under the License.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
export default class DataAggregator {
|
|
|
|
|
|
2017-02-27 15:07:43 +02:00
|
|
|
constructor(onDataCb, tsKeyNames, startTs, limit, aggregationType, timeWindow, interval, types, $timeout, $filter) {
|
2017-02-24 13:04:52 +02:00
|
|
|
this.onDataCb = onDataCb;
|
2017-02-27 15:07:43 +02:00
|
|
|
this.tsKeyNames = tsKeyNames;
|
|
|
|
|
this.startTs = startTs;
|
2017-02-24 13:04:52 +02:00
|
|
|
this.aggregationType = aggregationType;
|
|
|
|
|
this.types = types;
|
|
|
|
|
this.$timeout = $timeout;
|
|
|
|
|
this.$filter = $filter;
|
|
|
|
|
this.dataReceived = false;
|
2017-02-28 19:03:44 +02:00
|
|
|
this.resetPending = false;
|
2017-02-24 13:04:52 +02:00
|
|
|
this.noAggregation = aggregationType === types.aggregation.none.value;
|
2017-02-27 15:07:43 +02:00
|
|
|
this.limit = limit;
|
|
|
|
|
this.timeWindow = timeWindow;
|
|
|
|
|
this.interval = interval;
|
2017-02-28 19:03:44 +02:00
|
|
|
this.aggregationTimeout = Math.max(this.interval, 1000);
|
2017-02-24 13:04:52 +02:00
|
|
|
switch (aggregationType) {
|
|
|
|
|
case types.aggregation.min.value:
|
|
|
|
|
this.aggFunction = min;
|
|
|
|
|
break;
|
|
|
|
|
case types.aggregation.max.value:
|
2017-02-27 15:07:43 +02:00
|
|
|
this.aggFunction = max;
|
2017-02-24 13:04:52 +02:00
|
|
|
break;
|
|
|
|
|
case types.aggregation.avg.value:
|
|
|
|
|
this.aggFunction = avg;
|
|
|
|
|
break;
|
|
|
|
|
case types.aggregation.sum.value:
|
|
|
|
|
this.aggFunction = sum;
|
|
|
|
|
break;
|
|
|
|
|
case types.aggregation.count.value:
|
|
|
|
|
this.aggFunction = count;
|
|
|
|
|
break;
|
|
|
|
|
case types.aggregation.none.value:
|
|
|
|
|
this.aggFunction = none;
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
this.aggFunction = avg;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-02-28 19:03:44 +02:00
|
|
|
reset(startTs, timeWindow, interval) {
|
|
|
|
|
if (this.intervalTimeoutHandle) {
|
|
|
|
|
this.$timeout.cancel(this.intervalTimeoutHandle);
|
|
|
|
|
this.intervalTimeoutHandle = null;
|
|
|
|
|
}
|
|
|
|
|
this.intervalScheduledTime = currentTime();
|
|
|
|
|
this.startTs = startTs;
|
|
|
|
|
this.timeWindow = timeWindow;
|
|
|
|
|
this.interval = interval;
|
|
|
|
|
this.endTs = this.startTs + this.timeWindow;
|
|
|
|
|
this.elapsed = 0;
|
|
|
|
|
this.aggregationTimeout = Math.max(this.interval, 1000);
|
|
|
|
|
this.resetPending = true;
|
|
|
|
|
var self = this;
|
|
|
|
|
this.intervalTimeoutHandle = this.$timeout(function() {
|
|
|
|
|
self.onInterval();
|
|
|
|
|
}, this.aggregationTimeout, false);
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-01 20:02:01 +02:00
|
|
|
onData(data, update, history, apply) {
|
2017-02-28 19:03:44 +02:00
|
|
|
if (!this.dataReceived || this.resetPending) {
|
|
|
|
|
var updateIntervalScheduledTime = true;
|
|
|
|
|
if (!this.dataReceived) {
|
|
|
|
|
this.elapsed = 0;
|
|
|
|
|
this.dataReceived = true;
|
|
|
|
|
this.endTs = this.startTs + this.timeWindow;
|
|
|
|
|
}
|
|
|
|
|
if (this.resetPending) {
|
|
|
|
|
this.resetPending = false;
|
|
|
|
|
updateIntervalScheduledTime = false;
|
|
|
|
|
}
|
2017-02-27 15:07:43 +02:00
|
|
|
if (update) {
|
|
|
|
|
this.aggregationMap = {};
|
|
|
|
|
updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value,
|
|
|
|
|
this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs);
|
|
|
|
|
} else {
|
|
|
|
|
this.aggregationMap = processAggregatedData(data.data, this.aggregationType === this.types.aggregation.count.value, this.noAggregation);
|
|
|
|
|
}
|
2017-02-28 19:03:44 +02:00
|
|
|
if (updateIntervalScheduledTime) {
|
|
|
|
|
this.intervalScheduledTime = currentTime();
|
|
|
|
|
}
|
2017-03-01 20:02:01 +02:00
|
|
|
this.onInterval(history, apply);
|
2017-02-24 13:04:52 +02:00
|
|
|
} else {
|
|
|
|
|
updateAggregatedData(this.aggregationMap, this.aggregationType === this.types.aggregation.count.value,
|
|
|
|
|
this.noAggregation, this.aggFunction, data.data, this.interval, this.startTs);
|
2017-02-27 15:07:43 +02:00
|
|
|
if (history) {
|
2017-02-28 19:03:44 +02:00
|
|
|
this.intervalScheduledTime = currentTime();
|
2017-03-01 20:02:01 +02:00
|
|
|
this.onInterval(history, apply);
|
2017-02-27 15:07:43 +02:00
|
|
|
}
|
2017-02-24 13:04:52 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-01 20:02:01 +02:00
|
|
|
onInterval(history, apply) {
|
2017-02-24 13:04:52 +02:00
|
|
|
var now = currentTime();
|
2017-02-28 19:03:44 +02:00
|
|
|
this.elapsed += now - this.intervalScheduledTime;
|
|
|
|
|
this.intervalScheduledTime = now;
|
2017-02-24 13:04:52 +02:00
|
|
|
if (this.intervalTimeoutHandle) {
|
|
|
|
|
this.$timeout.cancel(this.intervalTimeoutHandle);
|
|
|
|
|
this.intervalTimeoutHandle = null;
|
|
|
|
|
}
|
2017-02-27 15:07:43 +02:00
|
|
|
if (!history) {
|
|
|
|
|
var delta = Math.floor(this.elapsed / this.interval);
|
|
|
|
|
if (delta || !this.data) {
|
|
|
|
|
this.startTs += delta * this.interval;
|
|
|
|
|
this.endTs += delta * this.interval;
|
|
|
|
|
this.data = toData(this.tsKeyNames, this.aggregationMap, this.startTs, this.endTs, this.$filter, this.limit);
|
|
|
|
|
this.elapsed = this.elapsed - delta * this.interval;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
this.data = toData(this.tsKeyNames, this.aggregationMap, this.startTs, this.endTs, this.$filter, this.limit);
|
2017-02-24 13:04:52 +02:00
|
|
|
}
|
|
|
|
|
if (this.onDataCb) {
|
2017-03-01 20:02:01 +02:00
|
|
|
this.onDataCb(this.data, this.startTs, this.endTs, apply);
|
2017-02-24 13:04:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var self = this;
|
2017-02-27 15:07:43 +02:00
|
|
|
if (!history) {
|
|
|
|
|
this.intervalTimeoutHandle = this.$timeout(function() {
|
2017-02-28 19:03:44 +02:00
|
|
|
self.onInterval();
|
2017-02-27 15:07:43 +02:00
|
|
|
}, this.aggregationTimeout, false);
|
|
|
|
|
}
|
2017-02-24 13:04:52 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
destroy() {
|
|
|
|
|
if (this.intervalTimeoutHandle) {
|
|
|
|
|
this.$timeout.cancel(this.intervalTimeoutHandle);
|
|
|
|
|
this.intervalTimeoutHandle = null;
|
|
|
|
|
}
|
|
|
|
|
this.aggregationMap = null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* eslint-disable */
|
|
|
|
|
function currentTime() {
|
|
|
|
|
return window.performance && window.performance.now ?
|
|
|
|
|
window.performance.now() : Date.now();
|
|
|
|
|
}
|
|
|
|
|
/* eslint-enable */
|
|
|
|
|
|
|
|
|
|
function processAggregatedData(data, isCount, noAggregation) {
|
|
|
|
|
var aggregationMap = {};
|
|
|
|
|
for (var key in data) {
|
|
|
|
|
var aggKeyData = aggregationMap[key];
|
|
|
|
|
if (!aggKeyData) {
|
|
|
|
|
aggKeyData = {};
|
|
|
|
|
aggregationMap[key] = aggKeyData;
|
|
|
|
|
}
|
|
|
|
|
var keyData = data[key];
|
|
|
|
|
for (var i in keyData) {
|
|
|
|
|
var kvPair = keyData[i];
|
|
|
|
|
var timestamp = kvPair[0];
|
|
|
|
|
var value = convertValue(kvPair[1], noAggregation);
|
|
|
|
|
var aggKey = timestamp;
|
|
|
|
|
var aggData = {
|
|
|
|
|
count: isCount ? value : 1,
|
|
|
|
|
sum: value,
|
|
|
|
|
aggValue: value
|
|
|
|
|
}
|
|
|
|
|
aggKeyData[aggKey] = aggData;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return aggregationMap;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function updateAggregatedData(aggregationMap, isCount, noAggregation, aggFunction, data, interval, startTs) {
|
|
|
|
|
for (var key in data) {
|
|
|
|
|
var aggKeyData = aggregationMap[key];
|
|
|
|
|
if (!aggKeyData) {
|
|
|
|
|
aggKeyData = {};
|
|
|
|
|
aggregationMap[key] = aggKeyData;
|
|
|
|
|
}
|
|
|
|
|
var keyData = data[key];
|
|
|
|
|
for (var i in keyData) {
|
|
|
|
|
var kvPair = keyData[i];
|
|
|
|
|
var timestamp = kvPair[0];
|
|
|
|
|
var value = convertValue(kvPair[1], noAggregation);
|
|
|
|
|
var aggTimestamp = noAggregation ? timestamp : (startTs + Math.floor((timestamp - startTs) / interval) * interval + interval/2);
|
|
|
|
|
var aggData = aggKeyData[aggTimestamp];
|
|
|
|
|
if (!aggData) {
|
|
|
|
|
aggData = {
|
|
|
|
|
count: 1,
|
|
|
|
|
sum: value,
|
|
|
|
|
aggValue: isCount ? 1 : value
|
|
|
|
|
}
|
|
|
|
|
aggKeyData[aggTimestamp] = aggData;
|
|
|
|
|
} else {
|
|
|
|
|
aggFunction(aggData, value);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-02-27 15:07:43 +02:00
|
|
|
function toData(tsKeyNames, aggregationMap, startTs, endTs, $filter, limit) {
|
2017-02-24 13:04:52 +02:00
|
|
|
var data = {};
|
2017-02-27 15:07:43 +02:00
|
|
|
for (var k in tsKeyNames) {
|
|
|
|
|
data[tsKeyNames[k]] = [];
|
|
|
|
|
}
|
2017-02-24 13:04:52 +02:00
|
|
|
for (var key in aggregationMap) {
|
|
|
|
|
var aggKeyData = aggregationMap[key];
|
|
|
|
|
var keyData = data[key];
|
|
|
|
|
for (var aggTimestamp in aggKeyData) {
|
|
|
|
|
if (aggTimestamp <= startTs) {
|
|
|
|
|
delete aggKeyData[aggTimestamp];
|
|
|
|
|
} else if (aggTimestamp <= endTs) {
|
|
|
|
|
var aggData = aggKeyData[aggTimestamp];
|
2017-02-27 15:07:43 +02:00
|
|
|
var kvPair = [Number(aggTimestamp), aggData.aggValue];
|
2017-02-24 13:04:52 +02:00
|
|
|
keyData.push(kvPair);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
keyData = $filter('orderBy')(keyData, '+this[0]');
|
|
|
|
|
if (keyData.length > limit) {
|
|
|
|
|
keyData = keyData.slice(keyData.length - limit);
|
|
|
|
|
}
|
|
|
|
|
data[key] = keyData;
|
|
|
|
|
}
|
|
|
|
|
return data;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function convertValue(value, noAggregation) {
|
|
|
|
|
if (!noAggregation || value && isNumeric(value)) {
|
|
|
|
|
return Number(value);
|
|
|
|
|
} else {
|
|
|
|
|
return value;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function isNumeric(value) {
|
|
|
|
|
return (value - parseFloat( value ) + 1) >= 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function avg(aggData, value) {
|
|
|
|
|
aggData.count++;
|
|
|
|
|
aggData.sum += value;
|
|
|
|
|
aggData.aggValue = aggData.sum / aggData.count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function min(aggData, value) {
|
|
|
|
|
aggData.aggValue = Math.min(aggData.aggValue, value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function max(aggData, value) {
|
|
|
|
|
aggData.aggValue = Math.max(aggData.aggValue, value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function sum(aggData, value) {
|
|
|
|
|
aggData.aggValue = aggData.aggValue + value;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function count(aggData) {
|
|
|
|
|
aggData.count++;
|
|
|
|
|
aggData.aggValue = aggData.count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function none(aggData, value) {
|
|
|
|
|
aggData.aggValue = value;
|
|
|
|
|
}
|