527 lines
14 KiB
TypeScript
Raw Normal View History

2019-08-19 20:09:41 +03:00
///
2020-02-20 10:26:43 +02:00
/// Copyright © 2016-2020 The Thingsboard Authors
2019-08-19 20:09:41 +03:00
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
2019-08-30 19:19:45 +03:00
import { EntityType } from '@shared/models/entity-type.models';
import { AggregationType } from '../time/time.models';
import { Observable, ReplaySubject, Subject } from 'rxjs';
import { EntityId } from '@shared/models/id/entity-id';
import { map } from 'rxjs/operators';
import { NgZone } from '@angular/core';
2020-11-09 15:54:05 +02:00
import {
AlarmData,
AlarmDataQuery,
EntityData,
EntityDataQuery,
EntityKey,
TsValue
} from '@shared/models/query/query.models';
import { PageData } from '@shared/models/page/page-data';
2019-08-29 20:04:59 +03:00
2019-08-19 20:09:41 +03:00
export enum DataKeyType {
timeseries = 'timeseries',
attribute = 'attribute',
function = 'function',
2020-02-21 19:04:49 +02:00
alarm = 'alarm',
entityField = 'entityField'
2019-08-19 20:09:41 +03:00
}
2019-08-29 20:04:59 +03:00
export enum LatestTelemetry {
LATEST_TELEMETRY = 'LATEST_TELEMETRY'
}
export enum AttributeScope {
CLIENT_SCOPE = 'CLIENT_SCOPE',
SERVER_SCOPE = 'SERVER_SCOPE',
SHARED_SCOPE = 'SHARED_SCOPE'
}
2019-08-30 19:19:45 +03:00
export enum TelemetryFeature {
ATTRIBUTES = 'ATTRIBUTES',
TIMESERIES = 'TIMESERIES'
}
2019-08-29 20:04:59 +03:00
export type TelemetryType = LatestTelemetry | AttributeScope;
export function toTelemetryType(val: string): TelemetryType {
if (LatestTelemetry[val]) {
return LatestTelemetry[val];
} else {
return AttributeScope[val];
}
}
2019-08-29 20:04:59 +03:00
export const telemetryTypeTranslations = new Map<TelemetryType, string>(
[
[LatestTelemetry.LATEST_TELEMETRY, 'attribute.scope-latest-telemetry'],
[AttributeScope.CLIENT_SCOPE, 'attribute.scope-client'],
[AttributeScope.SERVER_SCOPE, 'attribute.scope-server'],
[AttributeScope.SHARED_SCOPE, 'attribute.scope-shared']
]
);
export const isClientSideTelemetryType = new Map<TelemetryType, boolean>(
[
[LatestTelemetry.LATEST_TELEMETRY, true],
[AttributeScope.CLIENT_SCOPE, true],
[AttributeScope.SERVER_SCOPE, false],
[AttributeScope.SHARED_SCOPE, false]
]
);
export interface AttributeData {
lastUpdateTs?: number;
2019-08-29 20:04:59 +03:00
key: string;
value: any;
}
2019-08-30 19:19:45 +03:00
export interface TimeseriesData {
2020-11-09 15:54:05 +02:00
[key: string]: Array<TsValue>;
}
export enum DataSortOrder {
ASC = 'ASC',
DESC = 'DESC'
}
export interface WebsocketCmd {
2019-08-30 19:19:45 +03:00
cmdId: number;
}
export interface TelemetryPluginCmd extends WebsocketCmd {
2019-08-30 19:19:45 +03:00
keys: string;
}
export abstract class SubscriptionCmd implements TelemetryPluginCmd {
cmdId: number;
keys: string;
entityType: EntityType;
entityId: string;
scope?: AttributeScope;
unsubscribe: boolean;
abstract getType(): TelemetryFeature;
}
export class AttributesSubscriptionCmd extends SubscriptionCmd {
getType() {
return TelemetryFeature.ATTRIBUTES;
}
}
export class TimeseriesSubscriptionCmd extends SubscriptionCmd {
startTs: number;
timeWindow: number;
interval: number;
limit: number;
agg: AggregationType;
getType() {
return TelemetryFeature.TIMESERIES;
}
}
export class GetHistoryCmd implements TelemetryPluginCmd {
cmdId: number;
keys: string;
entityType: EntityType;
entityId: string;
startTs: number;
endTs: number;
interval: number;
limit: number;
agg: AggregationType;
}
export interface EntityHistoryCmd {
keys: Array<string>;
startTs: number;
endTs: number;
interval: number;
limit: number;
agg: AggregationType;
2020-06-24 18:07:47 +03:00
fetchLatestPreviousPoint?: boolean;
}
export interface LatestValueCmd {
2020-06-22 18:55:15 +03:00
keys: Array<EntityKey>;
}
export interface TimeSeriesCmd {
keys: Array<string>;
startTs: number;
timeWindow: number;
interval: number;
limit: number;
agg: AggregationType;
2020-06-24 18:07:47 +03:00
fetchLatestPreviousPoint?: boolean;
}
export class EntityDataCmd implements WebsocketCmd {
cmdId: number;
2020-06-22 18:55:15 +03:00
query?: EntityDataQuery;
historyCmd?: EntityHistoryCmd;
latestCmd?: LatestValueCmd;
tsCmd?: TimeSeriesCmd;
2020-06-23 20:56:44 +03:00
public isEmpty(): boolean {
return !this.query && !this.historyCmd && !this.latestCmd && !this.tsCmd;
}
}
2020-07-03 18:33:06 +03:00
export class AlarmDataCmd implements WebsocketCmd {
cmdId: number;
query?: AlarmDataQuery;
public isEmpty(): boolean {
return !this.query;
}
}
export class EntityDataUnsubscribeCmd implements WebsocketCmd {
cmdId: number;
}
2020-07-03 18:33:06 +03:00
export class AlarmDataUnsubscribeCmd implements WebsocketCmd {
cmdId: number;
}
2019-08-30 19:19:45 +03:00
export class TelemetryPluginCmdsWrapper {
attrSubCmds: Array<AttributesSubscriptionCmd>;
tsSubCmds: Array<TimeseriesSubscriptionCmd>;
historyCmds: Array<GetHistoryCmd>;
entityDataCmds: Array<EntityDataCmd>;
entityDataUnsubscribeCmds: Array<EntityDataUnsubscribeCmd>;
2020-07-03 18:33:06 +03:00
alarmDataCmds: Array<AlarmDataCmd>;
alarmDataUnsubscribeCmds: Array<AlarmDataUnsubscribeCmd>;
2019-08-30 19:19:45 +03:00
constructor() {
this.attrSubCmds = [];
this.tsSubCmds = [];
this.historyCmds = [];
this.entityDataCmds = [];
this.entityDataUnsubscribeCmds = [];
2020-07-03 18:33:06 +03:00
this.alarmDataCmds = [];
this.alarmDataUnsubscribeCmds = [];
2019-08-30 19:19:45 +03:00
}
public hasCommands(): boolean {
return this.tsSubCmds.length > 0 ||
this.historyCmds.length > 0 ||
this.attrSubCmds.length > 0 ||
this.entityDataCmds.length > 0 ||
2020-07-03 18:33:06 +03:00
this.entityDataUnsubscribeCmds.length > 0 ||
this.alarmDataCmds.length > 0 ||
this.alarmDataUnsubscribeCmds.length > 0;
2019-08-30 19:19:45 +03:00
}
public clear() {
this.attrSubCmds.length = 0;
this.tsSubCmds.length = 0;
this.historyCmds.length = 0;
this.entityDataCmds.length = 0;
this.entityDataUnsubscribeCmds.length = 0;
2020-07-03 18:33:06 +03:00
this.alarmDataCmds.length = 0;
this.alarmDataUnsubscribeCmds.length = 0;
2019-08-30 19:19:45 +03:00
}
public preparePublishCommands(maxCommands: number): TelemetryPluginCmdsWrapper {
const preparedWrapper = new TelemetryPluginCmdsWrapper();
let leftCount = maxCommands;
preparedWrapper.tsSubCmds = this.popCmds(this.tsSubCmds, leftCount);
leftCount -= preparedWrapper.tsSubCmds.length;
preparedWrapper.historyCmds = this.popCmds(this.historyCmds, leftCount);
leftCount -= preparedWrapper.historyCmds.length;
preparedWrapper.attrSubCmds = this.popCmds(this.attrSubCmds, leftCount);
leftCount -= preparedWrapper.attrSubCmds.length;
preparedWrapper.entityDataCmds = this.popCmds(this.entityDataCmds, leftCount);
leftCount -= preparedWrapper.entityDataCmds.length;
preparedWrapper.entityDataUnsubscribeCmds = this.popCmds(this.entityDataUnsubscribeCmds, leftCount);
2020-07-03 18:33:06 +03:00
leftCount -= preparedWrapper.entityDataUnsubscribeCmds.length;
preparedWrapper.alarmDataCmds = this.popCmds(this.alarmDataCmds, leftCount);
leftCount -= preparedWrapper.alarmDataCmds.length;
preparedWrapper.alarmDataUnsubscribeCmds = this.popCmds(this.alarmDataUnsubscribeCmds, leftCount);
2019-08-30 19:19:45 +03:00
return preparedWrapper;
}
private popCmds<T>(cmds: Array<T>, leftCount: number): Array<T> {
2019-08-30 19:19:45 +03:00
const toPublish = Math.min(cmds.length, leftCount);
if (toPublish > 0) {
return cmds.splice(0, toPublish);
} else {
return [];
}
}
}
export interface SubscriptionData {
[key: string]: [number, any][];
}
export interface SubscriptionDataHolder {
data: SubscriptionData;
}
export interface SubscriptionUpdateMsg extends SubscriptionDataHolder {
2019-08-30 19:19:45 +03:00
subscriptionId: number;
errorCode: number;
errorMsg: string;
}
2020-07-03 18:33:06 +03:00
export enum DataUpdateType {
ENTITY_DATA = 'ENTITY_DATA',
ALARM_DATA = 'ALARM_DATA'
}
export interface DataUpdateMsg<T> {
cmdId: number;
2020-07-03 18:33:06 +03:00
data?: PageData<T>;
update?: Array<T>;
errorCode: number;
errorMsg: string;
2020-07-03 18:33:06 +03:00
dataUpdateType: DataUpdateType;
}
export interface EntityDataUpdateMsg extends DataUpdateMsg<EntityData> {
dataUpdateType: DataUpdateType.ENTITY_DATA;
}
2020-07-03 18:33:06 +03:00
export interface AlarmDataUpdateMsg extends DataUpdateMsg<AlarmData> {
dataUpdateType: DataUpdateType.ALARM_DATA;
allowedEntities: number;
totalEntities: number;
2020-07-03 18:33:06 +03:00
}
export type WebsocketDataMsg = AlarmDataUpdateMsg | EntityDataUpdateMsg | SubscriptionUpdateMsg;
export function isEntityDataUpdateMsg(message: WebsocketDataMsg): message is EntityDataUpdateMsg {
2020-07-03 18:33:06 +03:00
const updateMsg = (message as DataUpdateMsg<any>);
return updateMsg.cmdId !== undefined && updateMsg.dataUpdateType === DataUpdateType.ENTITY_DATA;
}
export function isAlarmDataUpdateMsg(message: WebsocketDataMsg): message is AlarmDataUpdateMsg {
const updateMsg = (message as DataUpdateMsg<any>);
return updateMsg.cmdId !== undefined && updateMsg.dataUpdateType === DataUpdateType.ALARM_DATA;
}
2019-08-30 19:19:45 +03:00
export class SubscriptionUpdate implements SubscriptionUpdateMsg {
subscriptionId: number;
errorCode: number;
errorMsg: string;
data: SubscriptionData;
2019-08-30 19:19:45 +03:00
constructor(msg: SubscriptionUpdateMsg) {
this.subscriptionId = msg.subscriptionId;
this.errorCode = msg.errorCode;
this.errorMsg = msg.errorMsg;
this.data = msg.data;
}
public prepareData(keys: string[]) {
if (!this.data) {
this.data = {};
}
if (keys) {
keys.forEach((key) => {
if (!this.data[key]) {
this.data[key] = [];
}
});
}
}
public updateAttributeData(origData: Array<AttributeData>): Array<AttributeData> {
for (const key of Object.keys(this.data)) {
const keyData = this.data[key];
if (keyData.length) {
const existing = origData.find((data) => data.key === key);
if (existing) {
existing.lastUpdateTs = keyData[0][0];
existing.value = keyData[0][1];
} else {
origData.push(
{
key,
lastUpdateTs: keyData[0][0],
value: keyData[0][1]
}
);
}
}
}
return origData;
}
}
2020-07-03 18:33:06 +03:00
export class DataUpdate<T> implements DataUpdateMsg<T> {
cmdId: number;
errorCode: number;
errorMsg: string;
2020-07-03 18:33:06 +03:00
data?: PageData<T>;
update?: Array<T>;
dataUpdateType: DataUpdateType;
2020-07-03 18:33:06 +03:00
constructor(msg: DataUpdateMsg<T>) {
this.cmdId = msg.cmdId;
this.errorCode = msg.errorCode;
this.errorMsg = msg.errorMsg;
this.data = msg.data;
this.update = msg.update;
2020-07-03 18:33:06 +03:00
this.dataUpdateType = msg.dataUpdateType;
}
}
export class EntityDataUpdate extends DataUpdate<EntityData> {
constructor(msg: EntityDataUpdateMsg) {
super(msg);
}
}
export class AlarmDataUpdate extends DataUpdate<AlarmData> {
allowedEntities: number;
totalEntities: number;
2020-07-03 18:33:06 +03:00
constructor(msg: AlarmDataUpdateMsg) {
super(msg);
this.allowedEntities = msg.allowedEntities;
this.totalEntities = msg.totalEntities;
}
}
2019-08-30 19:19:45 +03:00
export interface TelemetryService {
subscribe(subscriber: TelemetrySubscriber);
2020-06-22 18:55:15 +03:00
update(subscriber: TelemetrySubscriber);
2019-08-30 19:19:45 +03:00
unsubscribe(subscriber: TelemetrySubscriber);
}
export class TelemetrySubscriber {
private dataSubject = new ReplaySubject<SubscriptionUpdate>(1);
private entityDataSubject = new ReplaySubject<EntityDataUpdate>(1);
2020-07-03 18:33:06 +03:00
private alarmDataSubject = new ReplaySubject<AlarmDataUpdate>(1);
2019-08-30 19:19:45 +03:00
private reconnectSubject = new Subject();
private zone: NgZone;
public subscriptionCommands: Array<WebsocketCmd>;
2019-08-30 19:19:45 +03:00
public data$ = this.dataSubject.asObservable();
public entityData$ = this.entityDataSubject.asObservable();
2020-07-03 18:33:06 +03:00
public alarmData$ = this.alarmDataSubject.asObservable();
2019-08-30 19:19:45 +03:00
public reconnect$ = this.reconnectSubject.asObservable();
public static createEntityAttributesSubscription(telemetryService: TelemetryService,
entityId: EntityId, attributeScope: TelemetryType,
zone: NgZone, keys: string[] = null): TelemetrySubscriber {
2019-08-30 19:19:45 +03:00
let subscriptionCommand: SubscriptionCmd;
if (attributeScope === LatestTelemetry.LATEST_TELEMETRY) {
subscriptionCommand = new TimeseriesSubscriptionCmd();
} else {
subscriptionCommand = new AttributesSubscriptionCmd();
}
subscriptionCommand.entityType = entityId.entityType as EntityType;
subscriptionCommand.entityId = entityId.id;
subscriptionCommand.scope = attributeScope as AttributeScope;
if (keys) {
subscriptionCommand.keys = keys.join(',');
}
const subscriber = new TelemetrySubscriber(telemetryService);
subscriber.zone = zone;
2019-08-30 19:19:45 +03:00
subscriber.subscriptionCommands.push(subscriptionCommand);
return subscriber;
}
constructor(private telemetryService: TelemetryService) {
this.subscriptionCommands = [];
}
public subscribe() {
this.telemetryService.subscribe(this);
}
2020-06-22 18:55:15 +03:00
public update() {
this.telemetryService.update(this);
}
2019-08-30 19:19:45 +03:00
public unsubscribe() {
this.telemetryService.unsubscribe(this);
this.complete();
}
public complete() {
2019-08-30 19:19:45 +03:00
this.dataSubject.complete();
this.entityDataSubject.complete();
2020-07-03 18:33:06 +03:00
this.alarmDataSubject.complete();
2019-08-30 19:19:45 +03:00
this.reconnectSubject.complete();
}
public onData(message: SubscriptionUpdate) {
const cmdId = message.subscriptionId;
let keys: string[];
const cmd = this.subscriptionCommands.find((command) => command.cmdId === cmdId);
if (cmd) {
const telemetryPluginCmd = cmd as TelemetryPluginCmd;
if (telemetryPluginCmd.keys && telemetryPluginCmd.keys.length) {
keys = telemetryPluginCmd.keys.split(',');
2019-08-30 19:19:45 +03:00
}
}
message.prepareData(keys);
if (this.zone) {
this.zone.run(
() => {
this.dataSubject.next(message);
}
);
} else {
this.dataSubject.next(message);
}
2019-08-30 19:19:45 +03:00
}
public onEntityData(message: EntityDataUpdate) {
if (this.zone) {
this.zone.run(
() => {
this.entityDataSubject.next(message);
}
);
} else {
this.entityDataSubject.next(message);
}
}
2020-07-03 18:33:06 +03:00
public onAlarmData(message: AlarmDataUpdate) {
if (this.zone) {
this.zone.run(
() => {
this.alarmDataSubject.next(message);
}
);
} else {
this.alarmDataSubject.next(message);
}
}
2019-08-30 19:19:45 +03:00
public onReconnected() {
this.reconnectSubject.next();
}
public attributeData$(): Observable<Array<AttributeData>> {
const attributeData = new Array<AttributeData>();
return this.data$.pipe(
map((message) => message.updateAttributeData(attributeData))
);
}
}