thingsboard/ui-ngx/src/app/core/ws/telemetry-websocket.service.ts

171 lines
6.5 KiB
TypeScript
Raw Normal View History

2019-08-30 19:19:45 +03:00
///
2024-01-09 10:46:16 +02:00
/// Copyright © 2016-2024 The Thingsboard Authors
2019-08-30 19:19:45 +03:00
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import { Inject, Injectable, NgZone } from '@angular/core';
2019-08-30 19:19:45 +03:00
import {
AlarmCountCmd,
AlarmCountUnsubscribeCmd,
AlarmCountUpdate,
2021-03-02 12:04:45 +02:00
AlarmDataCmd,
AlarmDataUnsubscribeCmd,
2020-07-03 18:33:06 +03:00
AlarmDataUpdate,
2023-02-16 12:59:43 +02:00
EntityCountCmd,
EntityCountUnsubscribeCmd,
2021-03-02 12:04:45 +02:00
EntityCountUpdate,
EntityDataCmd,
EntityDataUnsubscribeCmd,
EntityDataUpdate,
isAlarmCountUpdateMsg,
2021-03-02 12:04:45 +02:00
isAlarmDataUpdateMsg,
isEntityCountUpdateMsg,
isEntityDataUpdateMsg,
2023-12-11 13:30:34 +02:00
isNotificationCountUpdateMsg,
isNotificationsUpdateMsg,
MarkAllAsReadCmd,
MarkAsReadCmd,
NotificationCountUpdate,
2023-12-11 13:30:34 +02:00
NotificationSubscriber,
NotificationsUpdate,
2019-08-30 19:19:45 +03:00
SubscriptionCmd,
SubscriptionUpdate,
TelemetryPluginCmdsWrapper,
TelemetrySubscriber,
2023-12-11 13:30:34 +02:00
UnreadCountSubCmd,
UnreadSubCmd,
UnsubscribeCmd,
2021-03-02 12:04:45 +02:00
WebsocketDataMsg
2019-08-30 19:19:45 +03:00
} from '@app/shared/models/telemetry/telemetry.models';
2023-02-16 12:59:43 +02:00
import { Store } from '@ngrx/store';
2019-08-30 19:19:45 +03:00
import { AppState } from '@core/core.state';
import { AuthService } from '@core/auth/auth.service';
import { WINDOW } from '@core/services/window.service';
2023-02-16 12:59:43 +02:00
import { WebsocketService } from '@core/ws/websocket.service';
2019-08-30 19:19:45 +03:00
2019-12-23 14:36:44 +02:00
// @dynamic
2019-08-30 19:19:45 +03:00
@Injectable({
providedIn: 'root'
})
2023-02-16 12:59:43 +02:00
export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscriber> {
2019-08-30 19:19:45 +03:00
2023-02-16 12:59:43 +02:00
cmdWrapper: TelemetryPluginCmdsWrapper;
2019-08-30 19:19:45 +03:00
2023-02-16 12:59:43 +02:00
constructor(protected store: Store<AppState>,
protected authService: AuthService,
protected ngZone: NgZone,
@Inject(WINDOW) protected window: Window) {
super(store, authService, ngZone, 'api/ws', new TelemetryPluginCmdsWrapper(), window);
2019-08-30 19:19:45 +03:00
}
public subscribe(subscriber: TelemetrySubscriber) {
this.isActive = true;
subscriber.subscriptionCommands.forEach(
(subscriptionCommand) => {
const cmdId = this.nextCmdId();
if (!(subscriptionCommand instanceof MarkAsReadCmd) && !(subscriptionCommand instanceof MarkAllAsReadCmd)) {
this.subscribersMap.set(cmdId, subscriber);
2019-08-30 19:19:45 +03:00
}
subscriptionCommand.cmdId = cmdId;
this.cmdWrapper.cmds.push(subscriptionCommand);
2019-08-30 19:19:45 +03:00
}
);
this.subscribersCount++;
this.publishCommands();
}
2020-06-22 18:55:15 +03:00
public update(subscriber: TelemetrySubscriber) {
2020-06-24 12:08:14 +03:00
if (!this.isReconnect) {
subscriber.subscriptionCommands.forEach(
(subscriptionCommand) => {
if (subscriptionCommand.cmdId && (subscriptionCommand instanceof EntityDataCmd || subscriptionCommand instanceof UnreadSubCmd)) {
this.cmdWrapper.cmds.push(subscriptionCommand);
2020-06-24 12:08:14 +03:00
}
2020-06-22 18:55:15 +03:00
}
2020-06-24 12:08:14 +03:00
);
this.publishCommands();
}
2020-06-22 18:55:15 +03:00
}
2019-08-30 19:19:45 +03:00
public unsubscribe(subscriber: TelemetrySubscriber) {
if (this.isActive) {
subscriber.subscriptionCommands.forEach(
(subscriptionCommand) => {
if (subscriptionCommand instanceof SubscriptionCmd) {
subscriptionCommand.unsubscribe = true;
this.cmdWrapper.cmds.push(subscriptionCommand);
} else if (subscriptionCommand instanceof EntityDataCmd) {
const entityDataUnsubscribeCmd = new EntityDataUnsubscribeCmd();
entityDataUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdWrapper.cmds.push(entityDataUnsubscribeCmd);
2020-07-03 18:33:06 +03:00
} else if (subscriptionCommand instanceof AlarmDataCmd) {
const alarmDataUnsubscribeCmd = new AlarmDataUnsubscribeCmd();
alarmDataUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdWrapper.cmds.push(alarmDataUnsubscribeCmd);
2021-03-02 12:04:45 +02:00
} else if (subscriptionCommand instanceof EntityCountCmd) {
const entityCountUnsubscribeCmd = new EntityCountUnsubscribeCmd();
entityCountUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdWrapper.cmds.push(entityCountUnsubscribeCmd);
} else if (subscriptionCommand instanceof AlarmCountCmd) {
const alarmCountUnsubscribeCmd = new AlarmCountUnsubscribeCmd();
alarmCountUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdWrapper.cmds.push(alarmCountUnsubscribeCmd);
} else if (subscriptionCommand instanceof UnreadCountSubCmd || subscriptionCommand instanceof UnreadSubCmd) {
const notificationsUnsubCmds = new UnsubscribeCmd();
notificationsUnsubCmds.cmdId = subscriptionCommand.cmdId;
this.cmdWrapper.cmds.push(notificationsUnsubCmds);
2019-08-30 19:19:45 +03:00
}
const cmdId = subscriptionCommand.cmdId;
if (cmdId) {
this.subscribersMap.delete(cmdId);
}
}
);
this.reconnectSubscribers.delete(subscriber);
this.subscribersCount--;
this.publishCommands();
}
}
2023-02-16 12:59:43 +02:00
processOnMessage(message: WebsocketDataMsg) {
let subscriber: TelemetrySubscriber | NotificationSubscriber;
if ('cmdId' in message && message.cmdId) {
2023-02-16 12:59:43 +02:00
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber instanceof NotificationSubscriber) {
2023-12-11 13:30:34 +02:00
if (isNotificationCountUpdateMsg(message)) {
subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message));
2023-12-11 13:30:34 +02:00
} else if (isNotificationsUpdateMsg(message)) {
subscriber.onNotificationsUpdate(new NotificationsUpdate(message));
}
} else if (subscriber instanceof TelemetrySubscriber) {
2023-12-11 13:30:34 +02:00
if (isEntityDataUpdateMsg(message)) {
subscriber.onEntityData(new EntityDataUpdate(message));
2023-12-11 13:30:34 +02:00
} else if (isAlarmDataUpdateMsg(message)) {
subscriber.onAlarmData(new AlarmDataUpdate(message));
2023-12-11 13:30:34 +02:00
} else if (isEntityCountUpdateMsg(message)) {
subscriber.onEntityCount(new EntityCountUpdate(message));
2023-12-11 13:30:34 +02:00
} else if (isAlarmCountUpdateMsg(message)) {
subscriber.onAlarmCount(new AlarmCountUpdate(message));
}
}
} else if ('subscriptionId' in message && message.subscriptionId) {
subscriber = this.subscribersMap.get(message.subscriptionId) as TelemetrySubscriber;
2023-02-16 12:59:43 +02:00
if (subscriber) {
subscriber.onData(new SubscriptionUpdate(message));
2019-08-30 19:19:45 +03:00
}
}
}
}