diff --git a/ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts b/ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts index da876d0d3f..0acbba1139 100644 --- a/ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts +++ b/ui-ngx/src/app/shared/models/websocket/notification-ws.models.ts @@ -16,7 +16,7 @@ import { BehaviorSubject, ReplaySubject } from 'rxjs'; import { CmdUpdate, CmdUpdateMsg, CmdUpdateType, WebsocketCmd } from '@shared/models/telemetry/telemetry.models'; -import { first, map } from 'rxjs/operators'; +import { map } from 'rxjs/operators'; import { NgZone } from '@angular/core'; import { isDefinedAndNotNull } from '@core/utils'; import { Notification } from '@shared/models/notification.models'; @@ -47,12 +47,19 @@ export class NotificationsUpdate extends CmdUpdate { export class NotificationSubscriber extends WsSubscriber { private notificationCountSubject = new ReplaySubject(1); - private notificationsSubject = new BehaviorSubject(null); + private notificationsSubject = new BehaviorSubject({ + cmdId: 0, + cmdUpdateType: undefined, + errorCode: 0, + errorMsg: '', + notifications: [], + totalUnreadCount: 0 + }); public messageLimit = 10; public notificationCount$ = this.notificationCountSubject.asObservable().pipe(map(msg => msg.totalUnreadCount)); - public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg?.notifications || [])); + public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg.notifications )); public static createNotificationCountSubscription(notificationWsService: NotificationWebsocketService, zone: NgZone): NotificationSubscriber { @@ -109,35 +116,27 @@ export class NotificationSubscriber extends WsSubscriber { } onNotificationsUpdate(message: NotificationsUpdate) { - this.notificationsSubject.asObservable().pipe( - first() - ).subscribe((value) => { - let saveMessage; - if (isDefinedAndNotNull(value) && message.update) { - const findIndex = value.notifications.findIndex(item => item.id.id === message.update.id.id); - if (findIndex !== -1) { - value.notifications.push(message.update); - value.notifications.sort((a, b) => b.createdTime - a.createdTime); - if (value.notifications.length > this.messageLimit) { - value.notifications.pop(); - } + const currentNotifications = this.notificationsSubject.value; + let processMessage = message; + if (isDefinedAndNotNull(currentNotifications) && message.update) { + currentNotifications.notifications.unshift(message.update); + if (currentNotifications.notifications.length > this.messageLimit) { + currentNotifications.notifications.pop(); + } + processMessage = currentNotifications; + processMessage.totalUnreadCount = message.totalUnreadCount; + } + if (this.zone) { + this.zone.run( + () => { + this.notificationsSubject.next(processMessage); + this.notificationCountSubject.next(processMessage); } - saveMessage = value; - } else { - saveMessage = message; - } - if (this.zone) { - this.zone.run( - () => { - this.notificationsSubject.next(saveMessage); - this.notificationCountSubject.next(saveMessage); - } - ); - } else { - this.notificationsSubject.next(saveMessage); - this.notificationCountSubject.next(saveMessage); - } - }); + ); + } else { + this.notificationsSubject.next(processMessage); + this.notificationCountSubject.next(processMessage); + } } }