diff --git a/ui-ngx/src/app/core/api/alarm-data-subscription.ts b/ui-ngx/src/app/core/api/alarm-data-subscription.ts index 3eb17c4dca..f9efe92d76 100644 --- a/ui-ngx/src/app/core/api/alarm-data-subscription.ts +++ b/ui-ngx/src/app/core/api/alarm-data-subscription.ts @@ -17,7 +17,6 @@ import { AlarmDataCmd, DataKeyType, - TelemetryService, TelemetrySubscriber } from '@shared/models/telemetry/telemetry.models'; import { DatasourceType } from '@shared/models/widget.models'; @@ -34,6 +33,7 @@ import { AlarmDataListener } from '@core/api/alarm-data.service'; import { PageData } from '@shared/models/page/page-data'; import { deepClone, isDefined, isDefinedAndNotNull, isObject } from '@core/utils'; import { simulatedAlarm } from '@shared/models/alarm.models'; +import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service'; export interface AlarmSubscriptionDataKey { name: string; @@ -68,7 +68,7 @@ export class AlarmDataSubscription { private subsTw: SubscriptionTimewindow; constructor(private listener: AlarmDataListener, - private telemetryService: TelemetryService) { + private telemetryService: TelemetryWebsocketService) { } public unsubscribe() { diff --git a/ui-ngx/src/app/core/api/entity-data-subscription.ts b/ui-ngx/src/app/core/api/entity-data-subscription.ts index b92d4d2d9a..a49ce76470 100644 --- a/ui-ngx/src/app/core/api/entity-data-subscription.ts +++ b/ui-ngx/src/app/core/api/entity-data-subscription.ts @@ -41,7 +41,6 @@ import { EntityDataCmd, IndexedSubscriptionData, SubscriptionData, - TelemetryService, TelemetrySubscriber } from '@shared/models/telemetry/telemetry.models'; import { UtilsService } from '@core/services/utils.service'; @@ -53,6 +52,7 @@ import { NULL_UUID } from '@shared/models/id/has-uuid'; import { EntityType } from '@shared/models/entity-type.models'; import { Observable, of, ReplaySubject, Subject } from 'rxjs'; import { EntityId } from '@shared/models/id/entity-id'; +import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service'; import Timeout = NodeJS.Timeout; declare type DataKeyFunction = (time: number, prevValue: any) => any; @@ -96,7 +96,7 @@ export interface EntityDataSubscriptionOptions { export class EntityDataSubscription { constructor(private listener: EntityDataListener, - private telemetryService: TelemetryService, + private telemetryService: TelemetryWebsocketService, private utils: UtilsService) { this.initializeSubscription(); } diff --git a/ui-ngx/src/app/core/ws/notification-websocket.service.ts b/ui-ngx/src/app/core/ws/notification-websocket.service.ts index e514d5ecde..608fe2dc5c 100644 --- a/ui-ngx/src/app/core/ws/notification-websocket.service.ts +++ b/ui-ngx/src/app/core/ws/notification-websocket.service.ts @@ -15,81 +15,41 @@ /// import { Inject, Injectable, NgZone } from '@angular/core'; -import { select, Store } from '@ngrx/store'; +import { Store } from '@ngrx/store'; import { AppState } from '@core/core.state'; import { AuthService } from '@core/auth/auth.service'; -import { selectIsAuthenticated } from '@core/auth/auth.selectors'; import { WINDOW } from '@core/services/window.service'; -import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; -import { ActionNotificationShow } from '@core/notification/notification.actions'; import { isNotificationCountUpdateMsg, isNotificationsUpdateMsg, MarkAllAsReadCmd, MarkAsReadCmd, NotificationCountUpdate, - NotificationPluginCmdsWrapper, + NotificationPluginCmdWrapper, NotificationSubscriber, NotificationsUpdate, - NotificationWsService, UnreadCountSubCmd, UnreadSubCmd, UnsubscribeCmd, WebsocketNotificationMsg -} from '@shared/models/notification-ws.models'; -import Timeout = NodeJS.Timeout; +} from '@shared/models/websocket/notification-ws.models'; +import { WebsocketService } from '@core/ws/websocket.service'; -const RECONNECT_INTERVAL = 2000; -const WS_IDLE_TIMEOUT = 90000; // @dynamic @Injectable({ providedIn: 'root' }) -export class NotificationWebsocketService implements NotificationWsService { +export class NotificationWebsocketService extends WebsocketService { - isActive = false; - isOpening = false; - isOpened = false; - isReconnect = false; + cmdWrapper: NotificationPluginCmdWrapper; - socketCloseTimer: Timeout; - reconnectTimer: Timeout; - - lastCmdId = 0; - subscribersCount = 0; - subscribersMap = new Map(); - - reconnectSubscribers = new Set(); - - cmdsWrapper = new NotificationPluginCmdsWrapper(); - notificationUri: string; - - dataStream: WebSocketSubject; - - constructor(private store: Store, - private authService: AuthService, - private ngZone: NgZone, - @Inject(WINDOW) private window: Window) { - this.store.pipe(select(selectIsAuthenticated)).subscribe( - () => { - this.reset(true); - } - ); - - let port = this.window.location.port; - if (this.window.location.protocol === 'https:') { - if (!port) { - port = '443'; - } - this.notificationUri = 'wss:'; - } else { - if (!port) { - port = '80'; - } - this.notificationUri = 'ws:'; - } - this.notificationUri += `//${this.window.location.hostname}:${port}/api/ws/plugins/notifications`; + constructor(protected store: Store, + protected authService: AuthService, + protected ngZone: NgZone, + @Inject(WINDOW) protected window: Window) { + super(store, authService, ngZone, 'api/ws/plugins/notifications', new NotificationPluginCmdWrapper(), window); + this.errorName = 'WebSocket Notification Error'; } public subscribe(subscriber: NotificationSubscriber) { @@ -100,19 +60,19 @@ export class NotificationWebsocketService implements NotificationWsService { this.subscribersMap.set(cmdId, subscriber); subscriptionCommand.cmdId = cmdId; if (subscriptionCommand instanceof UnreadCountSubCmd) { - this.cmdsWrapper.unreadCountSubCmd = subscriptionCommand; + this.cmdWrapper.unreadCountSubCmd = subscriptionCommand; } else if (subscriptionCommand instanceof UnreadSubCmd) { - this.cmdsWrapper.unreadSubCmd = subscriptionCommand; + this.cmdWrapper.unreadSubCmd = subscriptionCommand; } else if (subscriptionCommand instanceof MarkAsReadCmd) { - this.cmdsWrapper.markAsReadCmd = subscriptionCommand; + this.cmdWrapper.markAsReadCmd = subscriptionCommand; this.subscribersMap.delete(cmdId); } else if (subscriptionCommand instanceof MarkAllAsReadCmd) { - this.cmdsWrapper.markAllAsReadCmd = subscriptionCommand; + this.cmdWrapper.markAllAsReadCmd = subscriptionCommand; this.subscribersMap.delete(cmdId); } } ); - if (this.cmdsWrapper.markAsReadCmd || this.cmdsWrapper.markAllAsReadCmd) { + if (this.cmdWrapper.markAsReadCmd || this.cmdWrapper.markAllAsReadCmd) { this.subscribersCount++; } this.publishCommands(); @@ -123,7 +83,7 @@ export class NotificationWebsocketService implements NotificationWsService { subscriber.subscriptionCommands.forEach( (subscriptionCommand) => { if (subscriptionCommand.cmdId && subscriptionCommand instanceof UnreadSubCmd) { - this.cmdsWrapper.unreadSubCmd = subscriptionCommand; + this.cmdWrapper.unreadSubCmd = subscriptionCommand; } } ); @@ -139,7 +99,7 @@ export class NotificationWebsocketService implements NotificationWsService { || subscriptionCommand instanceof UnreadSubCmd) { const unreadCountUnsubscribeCmd = new UnsubscribeCmd(); unreadCountUnsubscribeCmd.cmdId = subscriptionCommand.cmdId; - this.cmdsWrapper.unsubCmd = unreadCountUnsubscribeCmd; + this.cmdWrapper.unsubCmd = unreadCountUnsubscribeCmd; } const cmdId = subscriptionCommand.cmdId; if (cmdId) { @@ -153,184 +113,19 @@ export class NotificationWebsocketService implements NotificationWsService { } } - private nextCmdId(): number { - this.lastCmdId++; - return this.lastCmdId; - } - - private publishCommands() { - while (this.isOpened && this.cmdsWrapper.hasCommands()) { - this.dataStream.next(this.cmdsWrapper.preparePublishCommands()); - this.cmdsWrapper.clear(); - this.checkToClose(); - } - this.tryOpenSocket(); - } - - private checkToClose() { - if (this.subscribersCount === 0 && this.isOpened) { - if (!this.socketCloseTimer) { - this.socketCloseTimer = setTimeout( - () => this.closeSocket(), WS_IDLE_TIMEOUT); + processOnMessage(message: WebsocketNotificationMsg) { + let subscriber: NotificationSubscriber; + if (isNotificationCountUpdateMsg(message)) { + subscriber = this.subscribersMap.get(message.cmdId); + if (subscriber) { + subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message)); + } + } else if (isNotificationsUpdateMsg(message)) { + subscriber = this.subscribersMap.get(message.cmdId); + if (subscriber) { + subscriber.onNotificationsUpdate(new NotificationsUpdate(message)); } } } - private reset(close: boolean) { - if (this.socketCloseTimer) { - clearTimeout(this.socketCloseTimer); - this.socketCloseTimer = null; - } - this.lastCmdId = 0; - this.subscribersMap.clear(); - this.subscribersCount = 0; - this.cmdsWrapper.clear(); - if (close) { - this.closeSocket(); - } - } - - private closeSocket() { - this.isActive = false; - if (this.isOpened) { - this.dataStream.unsubscribe(); - } - } - - private tryOpenSocket() { - if (this.isActive) { - if (!this.isOpened && !this.isOpening) { - this.isOpening = true; - if (AuthService.isJwtTokenValid()) { - this.openSocket(AuthService.getJwtToken()); - } else { - this.authService.refreshJwtToken().subscribe(() => { - this.openSocket(AuthService.getJwtToken()); - }, - () => { - this.isOpening = false; - this.authService.logout(true, true); - } - ); - } - } - if (this.socketCloseTimer) { - clearTimeout(this.socketCloseTimer); - this.socketCloseTimer = null; - } - } - } - - private openSocket(token: string) { - const uri = `${this.notificationUri}?token=${token}`; - this.dataStream = webSocket( - { - url: uri, - openObserver: { - next: () => { - this.onOpen(); - } - }, - closeObserver: { - next: (e: CloseEvent) => { - this.onClose(e); - } - } - } - ); - - this.dataStream.subscribe((message) => { - this.ngZone.runOutsideAngular(() => { - this.onMessage(message as WebsocketNotificationMsg); - }); - }, - (error) => { - this.onError(error); - }); - } - - private onOpen() { - this.isOpening = false; - this.isOpened = true; - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - if (this.isReconnect) { - this.isReconnect = false; - this.reconnectSubscribers.forEach( - (reconnectSubscriber) => { - reconnectSubscriber.onReconnected(); - this.subscribe(reconnectSubscriber); - } - ); - this.reconnectSubscribers.clear(); - } else { - this.publishCommands(); - } - } - - private onMessage(message: WebsocketNotificationMsg) { - if (message.errorCode) { - this.showWsError(message.errorCode, message.errorMsg); - } else { - let subscriber: NotificationSubscriber; - if (isNotificationCountUpdateMsg(message)) { - subscriber = this.subscribersMap.get(message.cmdId); - if (subscriber) { - subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message)); - } - } else if (isNotificationsUpdateMsg(message)) { - subscriber = this.subscribersMap.get(message.cmdId); - if (subscriber) { - subscriber.onNotificationsUpdate(new NotificationsUpdate(message)); - } - } - } - this.checkToClose(); - } - - private onError(errorEvent) { - if (errorEvent) { - console.warn('WebSocket error event', errorEvent); - } - this.isOpening = false; - } - - private onClose(closeEvent: CloseEvent) { - if (closeEvent && closeEvent.code > 1001 && closeEvent.code !== 1006 - && closeEvent.code !== 1011 && closeEvent.code !== 1012 && closeEvent.code !== 4500) { - this.showWsError(closeEvent.code, closeEvent.reason); - } - this.isOpening = false; - this.isOpened = false; - if (this.isActive) { - if (!this.isReconnect) { - this.reconnectSubscribers.clear(); - this.subscribersMap.forEach( - (subscriber) => { - this.reconnectSubscribers.add(subscriber); - } - ); - this.reset(false); - this.isReconnect = true; - } - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - } - this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL); - } - } - - private showWsError(errorCode: number, errorMsg: string) { - let message = errorMsg; - if (!message) { - message += `WebSocket Notification Error: error code - ${errorCode}.`; - } - this.store.dispatch(new ActionNotificationShow( - { - message, type: 'error' - })); - } - } diff --git a/ui-ngx/src/app/core/ws/telemetry-websocket.service.ts b/ui-ngx/src/app/core/ws/telemetry-websocket.service.ts index 37a200262d..c98d0f429c 100644 --- a/ui-ngx/src/app/core/ws/telemetry-websocket.service.ts +++ b/ui-ngx/src/app/core/ws/telemetry-websocket.service.ts @@ -20,7 +20,8 @@ import { AlarmDataUnsubscribeCmd, AlarmDataUpdate, AttributesSubscriptionCmd, - EntityCountCmd, EntityCountUnsubscribeCmd, + EntityCountCmd, + EntityCountUnsubscribeCmd, EntityCountUpdate, EntityDataCmd, EntityDataUnsubscribeCmd, @@ -33,72 +34,29 @@ import { SubscriptionUpdate, TelemetryFeature, TelemetryPluginCmdsWrapper, - TelemetryService, TelemetrySubscriber, TimeseriesSubscriptionCmd, WebsocketDataMsg } from '@app/shared/models/telemetry/telemetry.models'; -import { select, Store } from '@ngrx/store'; +import { Store } from '@ngrx/store'; import { AppState } from '@core/core.state'; import { AuthService } from '@core/auth/auth.service'; -import { selectIsAuthenticated } from '@core/auth/auth.selectors'; import { WINDOW } from '@core/services/window.service'; -import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; -import { ActionNotificationShow } from '@core/notification/notification.actions'; -import Timeout = NodeJS.Timeout; - -const RECONNECT_INTERVAL = 2000; -const WS_IDLE_TIMEOUT = 90000; -const MAX_PUBLISH_COMMANDS = 10; +import { WebsocketService } from '@core/ws/websocket.service'; // @dynamic @Injectable({ providedIn: 'root' }) -export class TelemetryWebsocketService implements TelemetryService { +export class TelemetryWebsocketService extends WebsocketService { - isActive = false; - isOpening = false; - isOpened = false; - isReconnect = false; + cmdWrapper: TelemetryPluginCmdsWrapper; - socketCloseTimer: Timeout; - reconnectTimer: Timeout; - - lastCmdId = 0; - subscribersCount = 0; - subscribersMap = new Map(); - - reconnectSubscribers = new Set(); - - cmdsWrapper = new TelemetryPluginCmdsWrapper(); - telemetryUri: string; - - dataStream: WebSocketSubject; - - constructor(private store: Store, - private authService: AuthService, - private ngZone: NgZone, - @Inject(WINDOW) private window: Window) { - this.store.pipe(select(selectIsAuthenticated)).subscribe( - () => { - this.reset(true); - } - ); - - let port = this.window.location.port; - if (this.window.location.protocol === 'https:') { - if (!port) { - port = '443'; - } - this.telemetryUri = 'wss:'; - } else { - if (!port) { - port = '80'; - } - this.telemetryUri = 'ws:'; - } - this.telemetryUri += `//${this.window.location.hostname}:${port}/api/ws/plugins/telemetry`; + constructor(protected store: Store, + protected authService: AuthService, + protected ngZone: NgZone, + @Inject(WINDOW) protected window: Window) { + super(store, authService, ngZone, 'api/ws/plugins/telemetry', new TelemetryPluginCmdsWrapper(), window); } public subscribe(subscriber: TelemetrySubscriber) { @@ -110,18 +68,18 @@ export class TelemetryWebsocketService implements TelemetryService { subscriptionCommand.cmdId = cmdId; if (subscriptionCommand instanceof SubscriptionCmd) { if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES) { - this.cmdsWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd); + this.cmdWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd); } else { - this.cmdsWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd); + this.cmdWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd); } } else if (subscriptionCommand instanceof GetHistoryCmd) { - this.cmdsWrapper.historyCmds.push(subscriptionCommand); + this.cmdWrapper.historyCmds.push(subscriptionCommand); } else if (subscriptionCommand instanceof EntityDataCmd) { - this.cmdsWrapper.entityDataCmds.push(subscriptionCommand); + this.cmdWrapper.entityDataCmds.push(subscriptionCommand); } else if (subscriptionCommand instanceof AlarmDataCmd) { - this.cmdsWrapper.alarmDataCmds.push(subscriptionCommand); + this.cmdWrapper.alarmDataCmds.push(subscriptionCommand); } else if (subscriptionCommand instanceof EntityCountCmd) { - this.cmdsWrapper.entityCountCmds.push(subscriptionCommand); + this.cmdWrapper.entityCountCmds.push(subscriptionCommand); } } ); @@ -134,7 +92,7 @@ export class TelemetryWebsocketService implements TelemetryService { subscriber.subscriptionCommands.forEach( (subscriptionCommand) => { if (subscriptionCommand.cmdId && subscriptionCommand instanceof EntityDataCmd) { - this.cmdsWrapper.entityDataCmds.push(subscriptionCommand); + this.cmdWrapper.entityDataCmds.push(subscriptionCommand); } } ); @@ -149,22 +107,22 @@ export class TelemetryWebsocketService implements TelemetryService { if (subscriptionCommand instanceof SubscriptionCmd) { subscriptionCommand.unsubscribe = true; if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES) { - this.cmdsWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd); + this.cmdWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd); } else { - this.cmdsWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd); + this.cmdWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd); } } else if (subscriptionCommand instanceof EntityDataCmd) { const entityDataUnsubscribeCmd = new EntityDataUnsubscribeCmd(); entityDataUnsubscribeCmd.cmdId = subscriptionCommand.cmdId; - this.cmdsWrapper.entityDataUnsubscribeCmds.push(entityDataUnsubscribeCmd); + this.cmdWrapper.entityDataUnsubscribeCmds.push(entityDataUnsubscribeCmd); } else if (subscriptionCommand instanceof AlarmDataCmd) { const alarmDataUnsubscribeCmd = new AlarmDataUnsubscribeCmd(); alarmDataUnsubscribeCmd.cmdId = subscriptionCommand.cmdId; - this.cmdsWrapper.alarmDataUnsubscribeCmds.push(alarmDataUnsubscribeCmd); + this.cmdWrapper.alarmDataUnsubscribeCmds.push(alarmDataUnsubscribeCmd); } else if (subscriptionCommand instanceof EntityCountCmd) { const entityCountUnsubscribeCmd = new EntityCountUnsubscribeCmd(); entityCountUnsubscribeCmd.cmdId = subscriptionCommand.cmdId; - this.cmdsWrapper.entityCountUnsubscribeCmds.push(entityCountUnsubscribeCmd); + this.cmdWrapper.entityCountUnsubscribeCmds.push(entityCountUnsubscribeCmd); } const cmdId = subscriptionCommand.cmdId; if (cmdId) { @@ -178,193 +136,29 @@ export class TelemetryWebsocketService implements TelemetryService { } } - private nextCmdId(): number { - this.lastCmdId++; - return this.lastCmdId; - } - - private publishCommands() { - while (this.isOpened && this.cmdsWrapper.hasCommands()) { - this.dataStream.next(this.cmdsWrapper.preparePublishCommands(MAX_PUBLISH_COMMANDS)); - this.checkToClose(); - } - this.tryOpenSocket(); - } - - private checkToClose() { - if (this.subscribersCount === 0 && this.isOpened) { - if (!this.socketCloseTimer) { - this.socketCloseTimer = setTimeout( - () => this.closeSocket(), WS_IDLE_TIMEOUT); + processOnMessage(message: WebsocketDataMsg) { + let subscriber: TelemetrySubscriber; + if (isEntityDataUpdateMsg(message)) { + subscriber = this.subscribersMap.get(message.cmdId); + if (subscriber) { + subscriber.onEntityData(new EntityDataUpdate(message)); + } + } else if (isAlarmDataUpdateMsg(message)) { + subscriber = this.subscribersMap.get(message.cmdId); + if (subscriber) { + subscriber.onAlarmData(new AlarmDataUpdate(message)); + } + } else if (isEntityCountUpdateMsg(message)) { + subscriber = this.subscribersMap.get(message.cmdId); + if (subscriber) { + subscriber.onEntityCount(new EntityCountUpdate(message)); + } + } else if (message.subscriptionId) { + subscriber = this.subscribersMap.get(message.subscriptionId); + if (subscriber) { + subscriber.onData(new SubscriptionUpdate(message)); } } } - private reset(close: boolean) { - if (this.socketCloseTimer) { - clearTimeout(this.socketCloseTimer); - this.socketCloseTimer = null; - } - this.lastCmdId = 0; - this.subscribersMap.clear(); - this.subscribersCount = 0; - this.cmdsWrapper.clear(); - if (close) { - this.closeSocket(); - } - } - - private closeSocket() { - this.isActive = false; - if (this.isOpened) { - this.dataStream.unsubscribe(); - } - } - - private tryOpenSocket() { - if (this.isActive) { - if (!this.isOpened && !this.isOpening) { - this.isOpening = true; - if (AuthService.isJwtTokenValid()) { - this.openSocket(AuthService.getJwtToken()); - } else { - this.authService.refreshJwtToken().subscribe(() => { - this.openSocket(AuthService.getJwtToken()); - }, - () => { - this.isOpening = false; - this.authService.logout(true, true); - } - ); - } - } - if (this.socketCloseTimer) { - clearTimeout(this.socketCloseTimer); - this.socketCloseTimer = null; - } - } - } - - private openSocket(token: string) { - const uri = `${this.telemetryUri}?token=${token}`; - this.dataStream = webSocket( - { - url: uri, - openObserver: { - next: () => { - this.onOpen(); - } - }, - closeObserver: { - next: (e: CloseEvent) => { - this.onClose(e); - } - } - } - ); - - this.dataStream.subscribe((message) => { - this.ngZone.runOutsideAngular(() => { - this.onMessage(message as WebsocketDataMsg); - }); - }, - (error) => { - this.onError(error); - }); - } - - private onOpen() { - this.isOpening = false; - this.isOpened = true; - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - if (this.isReconnect) { - this.isReconnect = false; - this.reconnectSubscribers.forEach( - (reconnectSubscriber) => { - reconnectSubscriber.onReconnected(); - this.subscribe(reconnectSubscriber); - } - ); - this.reconnectSubscribers.clear(); - } else { - this.publishCommands(); - } - } - - private onMessage(message: WebsocketDataMsg) { - if (message.errorCode) { - this.showWsError(message.errorCode, message.errorMsg); - } else { - let subscriber: TelemetrySubscriber; - if (isEntityDataUpdateMsg(message)) { - subscriber = this.subscribersMap.get(message.cmdId); - if (subscriber) { - subscriber.onEntityData(new EntityDataUpdate(message)); - } - } else if (isAlarmDataUpdateMsg(message)) { - subscriber = this.subscribersMap.get(message.cmdId); - if (subscriber) { - subscriber.onAlarmData(new AlarmDataUpdate(message)); - } - } else if (isEntityCountUpdateMsg(message)) { - subscriber = this.subscribersMap.get(message.cmdId); - if (subscriber) { - subscriber.onEntityCount(new EntityCountUpdate(message)); - } - } else if (message.subscriptionId) { - subscriber = this.subscribersMap.get(message.subscriptionId); - if (subscriber) { - subscriber.onData(new SubscriptionUpdate(message)); - } - } - } - this.checkToClose(); - } - - private onError(errorEvent) { - if (errorEvent) { - console.warn('WebSocket error event', errorEvent); - } - this.isOpening = false; - } - - private onClose(closeEvent: CloseEvent) { - if (closeEvent && closeEvent.code > 1001 && closeEvent.code !== 1006 - && closeEvent.code !== 1011 && closeEvent.code !== 1012 && closeEvent.code !== 4500) { - this.showWsError(closeEvent.code, closeEvent.reason); - } - this.isOpening = false; - this.isOpened = false; - if (this.isActive) { - if (!this.isReconnect) { - this.reconnectSubscribers.clear(); - this.subscribersMap.forEach( - (subscriber) => { - this.reconnectSubscribers.add(subscriber); - } - ); - this.reset(false); - this.isReconnect = true; - } - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - } - this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL); - } - } - - private showWsError(errorCode: number, errorMsg: string) { - let message = errorMsg; - if (!message) { - message += `WebSocket Error: error code - ${errorCode}.`; - } - this.store.dispatch(new ActionNotificationShow( - { - message, type: 'error' - })); - } - } diff --git a/ui-ngx/src/app/core/ws/websocket.service.ts b/ui-ngx/src/app/core/ws/websocket.service.ts new file mode 100644 index 0000000000..aeb75f4c4c --- /dev/null +++ b/ui-ngx/src/app/core/ws/websocket.service.ts @@ -0,0 +1,245 @@ +import { CmdWrapper, WsService, WsSubscriber } from '@shared/models/websocket/websocket.models'; +import { select, Store } from '@ngrx/store'; +import { AppState } from '@core/core.state'; +import { AuthService } from '@core/auth/auth.service'; +import { NgZone } from '@angular/core'; +import { selectIsAuthenticated } from '@core/auth/auth.selectors'; +import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; +import { WebsocketNotificationMsg } from '@shared/models/websocket/notification-ws.models'; +import { CmdUpdateMsg } from '@shared/models/telemetry/telemetry.models'; +import { ActionNotificationShow } from '@core/notification/notification.actions'; +import Timeout = NodeJS.Timeout; + +const RECONNECT_INTERVAL = 2000; +const WS_IDLE_TIMEOUT = 90000; +const MAX_PUBLISH_COMMANDS = 10; + +export abstract class WebsocketService implements WsService { + + isActive = false; + isOpening = false; + isOpened = false; + isReconnect = false; + + socketCloseTimer: Timeout; + reconnectTimer: Timeout; + + lastCmdId = 0; + subscribersCount = 0; + subscribersMap = new Map(); + + reconnectSubscribers = new Set(); + + notificationUri: string; + + dataStream: WebSocketSubject; + + errorName = 'WebSocket Error'; + + protected constructor(protected store: Store, + protected authService: AuthService, + protected ngZone: NgZone, + protected apiEndpoint: string, + protected cmdWrapper: CmdWrapper, + protected window: Window) { + this.store.pipe(select(selectIsAuthenticated)).subscribe( + () => { + this.reset(true); + } + ); + + let port = this.window.location.port; + if (this.window.location.protocol === 'https:') { + if (!port) { + port = '443'; + } + this.notificationUri = 'wss:'; + } else { + if (!port) { + port = '80'; + } + this.notificationUri = 'ws:'; + } + this.notificationUri += `//${this.window.location.hostname}:${port}/${apiEndpoint}`; + } + + abstract subscribe(subscriber: T); + + abstract update(subscriber: T); + + abstract unsubscribe(subscriber: T); + + abstract processOnMessage(message: any); + + protected nextCmdId(): number { + this.lastCmdId++; + return this.lastCmdId; + } + + protected publishCommands() { + while (this.isOpened && this.cmdWrapper.hasCommands()) { + this.dataStream.next(this.cmdWrapper.preparePublishCommands(MAX_PUBLISH_COMMANDS)); + this.checkToClose(); + } + this.tryOpenSocket(); + } + + private checkToClose() { + if (this.subscribersCount === 0 && this.isOpened) { + if (!this.socketCloseTimer) { + this.socketCloseTimer = setTimeout( + () => this.closeSocket(), WS_IDLE_TIMEOUT); + } + } + } + + private reset(close: boolean) { + if (this.socketCloseTimer) { + clearTimeout(this.socketCloseTimer); + this.socketCloseTimer = null; + } + this.lastCmdId = 0; + this.subscribersMap.clear(); + this.subscribersCount = 0; + this.cmdWrapper.clear(); + if (close) { + this.closeSocket(); + } + } + + private closeSocket() { + this.isActive = false; + if (this.isOpened) { + this.dataStream.unsubscribe(); + } + } + + private tryOpenSocket() { + if (this.isActive) { + if (!this.isOpened && !this.isOpening) { + this.isOpening = true; + if (AuthService.isJwtTokenValid()) { + this.openSocket(AuthService.getJwtToken()); + } else { + this.authService.refreshJwtToken().subscribe({ + next: () => { + this.openSocket(AuthService.getJwtToken()); + }, + error: () => { + this.isOpening = false; + this.authService.logout(true, true); + } + }); + } + } + if (this.socketCloseTimer) { + clearTimeout(this.socketCloseTimer); + this.socketCloseTimer = null; + } + } + } + + private openSocket(token: string) { + const uri = `${this.notificationUri}?token=${token}`; + this.dataStream = webSocket( + { + url: uri, + openObserver: { + next: () => { + this.onOpen(); + } + }, + closeObserver: { + next: (e: CloseEvent) => { + this.onClose(e); + } + } + } + ); + + this.dataStream.subscribe({ + next: (message) => { + this.ngZone.runOutsideAngular(() => { + this.onMessage(message as WebsocketNotificationMsg); + }); + }, + error: (error) => { + this.onError(error); + } + }); + } + + private onOpen() { + this.isOpening = false; + this.isOpened = true; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + if (this.isReconnect) { + this.isReconnect = false; + this.reconnectSubscribers.forEach( + (reconnectSubscriber) => { + reconnectSubscriber.onReconnected(); + this.subscribe(reconnectSubscriber); + } + ); + this.reconnectSubscribers.clear(); + } else { + this.publishCommands(); + } + } + + private onMessage(message: WebsocketNotificationMsg) { + if (message.errorCode) { + this.showWsError(message.errorCode, message.errorMsg); + } else { + this.processOnMessage(message); + } + this.checkToClose(); + } + + private onError(errorEvent) { + if (errorEvent) { + console.warn('WebSocket error event', errorEvent); + } + this.isOpening = false; + } + + private onClose(closeEvent: CloseEvent) { + if (closeEvent && closeEvent.code > 1001 && closeEvent.code !== 1006 + && closeEvent.code !== 1011 && closeEvent.code !== 1012 && closeEvent.code !== 4500) { + this.showWsError(closeEvent.code, closeEvent.reason); + } + this.isOpening = false; + this.isOpened = false; + if (this.isActive) { + if (!this.isReconnect) { + this.reconnectSubscribers.clear(); + this.subscribersMap.forEach( + (subscriber) => { + this.reconnectSubscribers.add(subscriber); + } + ); + this.reset(false); + this.isReconnect = true; + } + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + } + this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL); + } + } + + private showWsError(errorCode: number, errorMsg: string) { + let message = errorMsg; + if (!message) { + message += `${this.errorName}: error code - ${errorCode}.`; + } + this.store.dispatch(new ActionNotificationShow( + { + message, type: 'error' + })); + } + +} diff --git a/ui-ngx/src/app/modules/home/components/notification/notification-bell.component.ts b/ui-ngx/src/app/modules/home/components/notification/notification-bell.component.ts index dfbaa53c23..69055a76de 100644 --- a/ui-ngx/src/app/modules/home/components/notification/notification-bell.component.ts +++ b/ui-ngx/src/app/modules/home/components/notification/notification-bell.component.ts @@ -19,27 +19,36 @@ import { ChangeDetectorRef, Component, NgZone, - OnInit, + OnDestroy, Renderer2, ViewContainerRef } from '@angular/core'; import { NotificationWebsocketService } from '@core/ws/notification-websocket.service'; -import { Observable } from 'rxjs'; -import { distinctUntilChanged, publishReplay, refCount, tap } from 'rxjs/operators'; +import { BehaviorSubject, ReplaySubject, Subscription } from 'rxjs'; +import { distinctUntilChanged, share, tap } from 'rxjs/operators'; import { MatButton } from '@angular/material/button'; import { TbPopoverService } from '@shared/components/popover.service'; import { ShowNotificationPopoverComponent } from '@home/components/notification/show-notification-popover.component'; -import { NotificationSubscriber } from '@shared/models/notification-ws.models'; +import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models'; @Component({ selector: 'tb-notification-bell', templateUrl: './notification-bell.component.html', changeDetection: ChangeDetectionStrategy.OnPush }) -export class NotificationBellComponent implements OnInit { +export class NotificationBellComponent implements OnDestroy { private notificationSubscriber: NotificationSubscriber; - count$: Observable; + private notificationCountSubscriber: Subscription; + private countSubject = new BehaviorSubject(0); + + count$ = this.countSubject.asObservable().pipe( + distinctUntilChanged(), + tap(() => setTimeout(() => this.cd.markForCheck())), + share({ + connector: () => new ReplaySubject(1) + }) + ); constructor( private notificationWsService: NotificationWebsocketService, @@ -48,25 +57,18 @@ export class NotificationBellComponent implements OnInit { private popoverService: TbPopoverService, private renderer: Renderer2, private viewContainerRef: ViewContainerRef) { + this.initSubscription(); } - ngOnInit() { - this.notificationSubscriber = NotificationSubscriber.createNotificationCountSubscription( - this.notificationWsService, this.zone); - this.notificationSubscriber.subscribe(); - - this.count$ = this.notificationSubscriber.notificationCount$.pipe( - distinctUntilChanged(), - publishReplay(1), - refCount(), - tap(() => setTimeout(() => this.cd.markForCheck())), - ); + ngOnDestroy() { + this.unsubscribeSubscription(); } showNotification($event: Event, createVersionButton: MatButton) { if ($event) { $event.stopPropagation(); } + this.unsubscribeSubscription(); const trigger = createVersionButton._elementRef.nativeElement; if (this.popoverService.hasPopover(trigger)) { this.popoverService.hidePopover(trigger); @@ -76,7 +78,9 @@ export class NotificationBellComponent implements OnInit { { onClose: () => { showNotificationPopover.hide(); - } + this.initSubscription(); + }, + counter: this.countSubject }, {maxHeight: '90vh', height: '100%', padding: '10px'}, {width: '400px', minWidth: '100%', maxWidth: '100%'}, @@ -84,4 +88,16 @@ export class NotificationBellComponent implements OnInit { showNotificationPopover.tbComponentRef.instance.popoverComponent = showNotificationPopover; } } + + private initSubscription() { + this.notificationSubscriber = NotificationSubscriber.createNotificationCountSubscription(this.notificationWsService, this.zone); + this.notificationCountSubscriber = this.notificationSubscriber.notificationCount$.subscribe(value => this.countSubject.next(value)); + + this.notificationSubscriber.subscribe(); + } + + private unsubscribeSubscription() { + this.notificationCountSubscriber.unsubscribe(); + this.notificationSubscriber.unsubscribe(); + } } diff --git a/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.html b/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.html index 16aed752e9..57288cc5a7 100644 --- a/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.html +++ b/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.html @@ -26,7 +26,7 @@
-
+
diff --git a/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.ts b/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.ts index 778a583e72..af928af27b 100644 --- a/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.ts +++ b/ui-ngx/src/app/modules/home/components/notification/show-notification-popover.component.ts @@ -19,12 +19,12 @@ import { PageComponent } from '@shared/components/page.component'; import { TbPopoverComponent } from '@shared/components/popover.component'; import { Store } from '@ngrx/store'; import { AppState } from '@core/core.state'; -import { Notification } from '@shared/models/notification.models'; +import { Notification, NotificationRequest } from '@shared/models/notification.models'; import { NotificationWebsocketService } from '@core/ws/notification-websocket.service'; -import { Observable } from 'rxjs'; -import { publishReplay, refCount, tap } from 'rxjs/operators'; +import { BehaviorSubject, Observable, ReplaySubject, Subscription } from 'rxjs'; +import { share, tap } from 'rxjs/operators'; import { Router } from '@angular/router'; -import { NotificationSubscriber } from '@shared/models/notification-ws.models'; +import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models'; @Component({ selector: 'tb-show-notification-popover', @@ -36,10 +36,15 @@ export class ShowNotificationPopoverComponent extends PageComponent implements O @Input() onClose: () => void; + @Input() + counter: BehaviorSubject; + @Input() popoverComponent: TbPopoverComponent; private notificationSubscriber: NotificationSubscriber; + private notificationCountSubscriber: Subscription; + notifications$: Observable; constructor(protected store: Store, @@ -51,19 +56,22 @@ export class ShowNotificationPopoverComponent extends PageComponent implements O } ngOnInit() { - this.notificationSubscriber = NotificationSubscriber.createNotificationsSubscription( - this.notificationWsService, this.zone); + this.notificationSubscriber = NotificationSubscriber.createNotificationsSubscription(this.notificationWsService, this.zone); this.notifications$ = this.notificationSubscriber.notifications$.pipe( - publishReplay(1), - refCount(), + share({ + connector: () => new ReplaySubject(1) + }), tap(() => setTimeout(() => this.cd.markForCheck())) ); + this.notificationCountSubscriber = this.notificationSubscriber.notificationCount$.subscribe(value => this.counter.next(value)); this.notificationSubscriber.subscribe(); } ngOnDestroy() { super.ngOnDestroy(); + this.notificationCountSubscriber.unsubscribe(); this.notificationSubscriber.unsubscribe(); + this.onClose(); } markAsRead(id: string) { @@ -86,4 +94,8 @@ export class ShowNotificationPopoverComponent extends PageComponent implements O this.onClose(); this.router.navigateByUrl(this.router.createUrlTree(['notification-center'])).then(() => {}); } + + trackById (index: number, item: NotificationRequest): string { + return item.id.id; + } } diff --git a/ui-ngx/src/app/shared/models/public-api.ts b/ui-ngx/src/app/shared/models/public-api.ts index 684cfb1b3a..e9495fde50 100644 --- a/ui-ngx/src/app/shared/models/public-api.ts +++ b/ui-ngx/src/app/shared/models/public-api.ts @@ -39,7 +39,8 @@ export * from './event.models'; export * from './login.models'; export * from './material.models'; export * from './notification.models'; -export * from './notification-ws.models'; +export * from './websocket/notification-ws.models'; +export * from './websocket/websocket.models'; export * from './oauth2.models'; export * from './queue.models'; export * from './relation.models'; diff --git a/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts b/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts index b2773d3d96..44af6afc30 100644 --- a/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts +++ b/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts @@ -17,7 +17,7 @@ import { EntityType } from '@shared/models/entity-type.models'; import { AggregationType } from '../time/time.models'; -import { Observable, ReplaySubject, Subject } from 'rxjs'; +import { Observable, ReplaySubject } from 'rxjs'; import { EntityId } from '@shared/models/id/entity-id'; import { map } from 'rxjs/operators'; import { NgZone } from '@angular/core'; @@ -33,6 +33,8 @@ import { PageData } from '@shared/models/page/page-data'; import { alarmFields } from '@shared/models/alarm.models'; import { entityFields } from '@shared/models/entity.models'; import { isUndefined } from '@core/utils'; +import { CmdWrapper, WsSubscriber } from '@shared/models/websocket/websocket.models'; +import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service'; export enum DataKeyType { timeseries = 'timeseries', @@ -234,7 +236,7 @@ export class AlarmDataUnsubscribeCmd implements WebsocketCmd { cmdId: number; } -export class TelemetryPluginCmdsWrapper { +export class TelemetryPluginCmdsWrapper implements CmdWrapper { constructor() { this.attrSubCmds = []; @@ -564,31 +566,20 @@ export class EntityCountUpdate extends CmdUpdate { } } -export interface TelemetryService { - subscribe(subscriber: TelemetrySubscriber); - update(subscriber: TelemetrySubscriber); - unsubscribe(subscriber: TelemetrySubscriber); -} - -export class TelemetrySubscriber { +export class TelemetrySubscriber extends WsSubscriber { private dataSubject = new ReplaySubject(1); private entityDataSubject = new ReplaySubject(1); private alarmDataSubject = new ReplaySubject(1); private entityCountSubject = new ReplaySubject(1); - private reconnectSubject = new Subject(); - private tsOffset = undefined; - public subscriptionCommands: Array; - public data$ = this.dataSubject.asObservable(); public entityData$ = this.entityDataSubject.asObservable(); public alarmData$ = this.alarmDataSubject.asObservable(); public entityCount$ = this.entityCountSubject.asObservable(); - public reconnect$ = this.reconnectSubject.asObservable(); - public static createEntityAttributesSubscription(telemetryService: TelemetryService, + public static createEntityAttributesSubscription(telemetryService: TelemetryWebsocketService, entityId: EntityId, attributeScope: TelemetryType, zone: NgZone, keys: string[] = null): TelemetrySubscriber { let subscriptionCommand: SubscriptionCmd; @@ -608,21 +599,8 @@ export class TelemetrySubscriber { return subscriber; } - constructor(private telemetryService: TelemetryService, private zone?: NgZone) { - this.subscriptionCommands = []; - } - - public subscribe() { - this.telemetryService.subscribe(this); - } - - public update() { - this.telemetryService.update(this); - } - - public unsubscribe() { - this.telemetryService.unsubscribe(this); - this.complete(); + constructor(private telemetryService: TelemetryWebsocketService, protected zone?: NgZone) { + super(telemetryService, zone); } public complete() { @@ -630,7 +608,7 @@ export class TelemetrySubscriber { this.entityDataSubject.complete(); this.alarmDataSubject.complete(); this.entityCountSubject.complete(); - this.reconnectSubject.complete(); + super.complete(); } public setTsOffset(tsOffset: number): boolean { @@ -707,10 +685,6 @@ export class TelemetrySubscriber { } } - public onReconnected() { - this.reconnectSubject.next(); - } - public attributeData$(): Observable> { const attributeData = new Array(); return this.data$.pipe( diff --git a/ui-ngx/src/app/shared/models/notification-ws.models.ts b/ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts similarity index 86% rename from ui-ngx/src/app/shared/models/notification-ws.models.ts rename to ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts index c455a531e7..effd4fd334 100644 --- a/ui-ngx/src/app/shared/models/notification-ws.models.ts +++ b/ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts @@ -14,12 +14,14 @@ /// limitations under the License. /// -import { BehaviorSubject, ReplaySubject, Subject } from 'rxjs'; +import { BehaviorSubject, ReplaySubject } from 'rxjs'; import { CmdUpdate, CmdUpdateMsg, CmdUpdateType, WebsocketCmd } from '@shared/models/telemetry/telemetry.models'; import { first, map } from 'rxjs/operators'; import { NgZone } from '@angular/core'; import { isDefinedAndNotNull } from '@core/utils'; import { Notification } from '@shared/models/notification.models'; +import { CmdWrapper, WsSubscriber } from '@shared/models/websocket/websocket.models'; +import { NotificationWebsocketService } from '@core/ws/notification-websocket.service'; export class NotificationCountUpdate extends CmdUpdate { totalUnreadCount: number; @@ -43,26 +45,16 @@ export class NotificationsUpdate extends CmdUpdate { } } -export interface NotificationWsService { - subscribe(subscriber: NotificationSubscriber); - - update(subscriber: NotificationSubscriber); - - unsubscribe(subscriber: NotificationSubscriber); -} - -export class NotificationSubscriber { +export class NotificationSubscriber extends WsSubscriber { private notificationCountSubject = new ReplaySubject(1); private notificationsSubject = new BehaviorSubject(null); - private reconnectSubject = new Subject(); - public subscriptionCommands: Array; public messageLimit = 10; public notificationCount$ = this.notificationCountSubject.asObservable().pipe(map(msg => msg.totalUnreadCount)); public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg?.notifications || [])); - public static createNotificationCountSubscription(notificationWsService: NotificationWsService, + public static createNotificationCountSubscription(notificationWsService: NotificationWebsocketService, zone: NgZone): NotificationSubscriber { const subscriptionCommand = new UnreadCountSubCmd(); const subscriber = new NotificationSubscriber(notificationWsService, zone); @@ -70,7 +62,7 @@ export class NotificationSubscriber { return subscriber; } - public static createNotificationsSubscription(notificationWsService: NotificationWsService, + public static createNotificationsSubscription(notificationWsService: NotificationWebsocketService, zone: NgZone, limit = 10): NotificationSubscriber { const subscriptionCommand = new UnreadSubCmd(limit); const subscriber = new NotificationSubscriber(notificationWsService, zone); @@ -79,7 +71,7 @@ export class NotificationSubscriber { return subscriber; } - public static createMarkAsReadCommand(notificationWsService: NotificationWsService, + public static createMarkAsReadCommand(notificationWsService: NotificationWebsocketService, ids: string[]): NotificationSubscriber { const subscriptionCommand = new MarkAsReadCmd(ids); const subscriber = new NotificationSubscriber(notificationWsService); @@ -87,38 +79,15 @@ export class NotificationSubscriber { return subscriber; } - public static createMarkAllAsReadCommand(notificationWsService: NotificationWsService): NotificationSubscriber { + public static createMarkAllAsReadCommand(notificationWsService: NotificationWebsocketService): NotificationSubscriber { const subscriptionCommand = new MarkAllAsReadCmd(); const subscriber = new NotificationSubscriber(notificationWsService); subscriber.subscriptionCommands.push(subscriptionCommand); return subscriber; } - constructor(private notificationWsService: NotificationWsService, private zone?: NgZone) { - this.subscriptionCommands = []; - } - - public subscribe() { - this.notificationWsService.subscribe(this); - } - - public update() { - this.notificationWsService.update(this); - } - - public unsubscribe() { - this.notificationWsService.unsubscribe(this); - this.complete(); - } - - public onReconnected() { - this.reconnectSubject.next(); - } - - public complete() { - this.notificationCountSubject.complete(); - this.notificationsSubject.complete(); - this.reconnectSubject.complete(); + constructor(private notificationWsService: NotificationWebsocketService, protected zone?: NgZone) { + super(notificationWsService, zone); } onNotificationCountUpdate(message: NotificationCountUpdate) { @@ -133,6 +102,12 @@ export class NotificationSubscriber { } } + public complete() { + this.notificationCountSubject.complete(); + this.notificationsSubject.complete(); + super.complete(); + } + onNotificationsUpdate(message: NotificationsUpdate) { this.notificationsSubject.asObservable().pipe( first() @@ -221,7 +196,7 @@ export function isNotificationsUpdateMsg(message: WebsocketNotificationMsg): mes return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.NOTIFICATIONS; } -export class NotificationPluginCmdsWrapper { +export class NotificationPluginCmdWrapper implements CmdWrapper { constructor() { this.unreadCountSubCmd = null; @@ -253,13 +228,14 @@ export class NotificationPluginCmdsWrapper { this.markAllAsReadCmd = null; } - public preparePublishCommands(): NotificationPluginCmdsWrapper { - const preparedWrapper = new NotificationPluginCmdsWrapper(); + public preparePublishCommands(): NotificationPluginCmdWrapper { + const preparedWrapper = new NotificationPluginCmdWrapper(); preparedWrapper.unreadCountSubCmd = this.unreadCountSubCmd || undefined; preparedWrapper.unreadSubCmd = this.unreadSubCmd || undefined; preparedWrapper.unsubCmd = this.unsubCmd || undefined; preparedWrapper.markAsReadCmd = this.markAsReadCmd || undefined; preparedWrapper.markAllAsReadCmd = this.markAllAsReadCmd || undefined; + this.clear(); return preparedWrapper; } } diff --git a/ui-ngx/src/app/shared/models/websocket/websocket.models.ts b/ui-ngx/src/app/shared/models/websocket/websocket.models.ts new file mode 100644 index 0000000000..5e5ff77231 --- /dev/null +++ b/ui-ngx/src/app/shared/models/websocket/websocket.models.ts @@ -0,0 +1,51 @@ +import { NgZone } from '@angular/core'; +import { WebsocketCmd } from '@shared/models/telemetry/telemetry.models'; +import { Subject } from 'rxjs'; + +export interface WsService { + subscribe(subscriber: T); + update(subscriber: T); + unsubscribe(subscriber: T); +} + +export abstract class CmdWrapper { + abstract hasCommands(): boolean; + abstract clear(): void; + abstract preparePublishCommands(maxCommands: number): CmdWrapper; + + [key: string]: WebsocketCmd | any; +} + +export abstract class WsSubscriber { + + protected reconnectSubject = new Subject(); + + subscriptionCommands: Array; + + reconnect$ = this.reconnectSubject.asObservable(); + + protected constructor(protected wsService: WsService, protected zone?: NgZone) { + this.subscriptionCommands = []; + } + + public subscribe() { + this.wsService.subscribe(this); + } + + public update() { + this.wsService.update(this); + } + + public unsubscribe() { + this.wsService.unsubscribe(this); + this.complete(); + } + + public complete() { + this.reconnectSubject.complete(); + } + + public onReconnected() { + this.reconnectSubject.next(); + } +}