UI: Refactoring WS after review
This commit is contained in:
parent
2f2e5911fd
commit
1d674046d3
@ -32,12 +32,20 @@ import {
|
|||||||
isAlarmDataUpdateMsg,
|
isAlarmDataUpdateMsg,
|
||||||
isEntityCountUpdateMsg,
|
isEntityCountUpdateMsg,
|
||||||
isEntityDataUpdateMsg,
|
isEntityDataUpdateMsg,
|
||||||
|
isNotificationCountUpdateMsg,
|
||||||
|
isNotificationsUpdateMsg,
|
||||||
|
MarkAllAsReadCmd,
|
||||||
|
MarkAsReadCmd,
|
||||||
NotificationCountUpdate,
|
NotificationCountUpdate,
|
||||||
|
NotificationSubscriber,
|
||||||
NotificationsUpdate,
|
NotificationsUpdate,
|
||||||
SubscriptionCmd,
|
SubscriptionCmd,
|
||||||
SubscriptionUpdate,
|
SubscriptionUpdate,
|
||||||
TelemetryPluginCmdsWrapper,
|
TelemetryPluginCmdsWrapper,
|
||||||
TelemetrySubscriber,
|
TelemetrySubscriber,
|
||||||
|
UnreadCountSubCmd,
|
||||||
|
UnreadSubCmd,
|
||||||
|
UnsubscribeCmd,
|
||||||
WebsocketDataMsg
|
WebsocketDataMsg
|
||||||
} from '@app/shared/models/telemetry/telemetry.models';
|
} from '@app/shared/models/telemetry/telemetry.models';
|
||||||
import { Store } from '@ngrx/store';
|
import { Store } from '@ngrx/store';
|
||||||
@ -45,16 +53,6 @@ import { AppState } from '@core/core.state';
|
|||||||
import { AuthService } from '@core/auth/auth.service';
|
import { AuthService } from '@core/auth/auth.service';
|
||||||
import { WINDOW } from '@core/services/window.service';
|
import { WINDOW } from '@core/services/window.service';
|
||||||
import { WebsocketService } from '@core/ws/websocket.service';
|
import { WebsocketService } from '@core/ws/websocket.service';
|
||||||
import {
|
|
||||||
isNotificationCountUpdateMsg,
|
|
||||||
isNotificationsUpdateMsg,
|
|
||||||
MarkAllAsReadCmd,
|
|
||||||
MarkAsReadCmd,
|
|
||||||
NotificationSubscriber,
|
|
||||||
UnreadCountSubCmd,
|
|
||||||
UnreadSubCmd,
|
|
||||||
UnsubscribeCmd
|
|
||||||
} from '@shared/models/websocket/notification-ws.models';
|
|
||||||
|
|
||||||
// @dynamic
|
// @dynamic
|
||||||
@Injectable({
|
@Injectable({
|
||||||
@ -145,19 +143,19 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
|
|||||||
if ('cmdId' in message && message.cmdId) {
|
if ('cmdId' in message && message.cmdId) {
|
||||||
subscriber = this.subscribersMap.get(message.cmdId);
|
subscriber = this.subscribersMap.get(message.cmdId);
|
||||||
if (subscriber instanceof NotificationSubscriber) {
|
if (subscriber instanceof NotificationSubscriber) {
|
||||||
if (isNotificationCountUpdateMsg(message) && subscriber) {
|
if (isNotificationCountUpdateMsg(message)) {
|
||||||
subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message));
|
subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message));
|
||||||
} else if (isNotificationsUpdateMsg(message) && subscriber) {
|
} else if (isNotificationsUpdateMsg(message)) {
|
||||||
subscriber.onNotificationsUpdate(new NotificationsUpdate(message));
|
subscriber.onNotificationsUpdate(new NotificationsUpdate(message));
|
||||||
}
|
}
|
||||||
} else if (subscriber instanceof TelemetrySubscriber) {
|
} else if (subscriber instanceof TelemetrySubscriber) {
|
||||||
if (isEntityDataUpdateMsg(message) && subscriber) {
|
if (isEntityDataUpdateMsg(message)) {
|
||||||
subscriber.onEntityData(new EntityDataUpdate(message));
|
subscriber.onEntityData(new EntityDataUpdate(message));
|
||||||
} else if (isAlarmDataUpdateMsg(message) && subscriber) {
|
} else if (isAlarmDataUpdateMsg(message)) {
|
||||||
subscriber.onAlarmData(new AlarmDataUpdate(message));
|
subscriber.onAlarmData(new AlarmDataUpdate(message));
|
||||||
} else if (isEntityCountUpdateMsg(message) && subscriber) {
|
} else if (isEntityCountUpdateMsg(message)) {
|
||||||
subscriber.onEntityCount(new EntityCountUpdate(message));
|
subscriber.onEntityCount(new EntityCountUpdate(message));
|
||||||
} else if (isAlarmCountUpdateMsg(message) && subscriber) {
|
} else if (isAlarmCountUpdateMsg(message)) {
|
||||||
subscriber.onAlarmCount(new AlarmCountUpdate(message));
|
subscriber.onAlarmCount(new AlarmCountUpdate(message));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,11 +25,11 @@ import {
|
|||||||
AuthCmd,
|
AuthCmd,
|
||||||
AuthWsCmd,
|
AuthWsCmd,
|
||||||
CmdUpdateMsg,
|
CmdUpdateMsg,
|
||||||
|
NotificationSubscriber,
|
||||||
TelemetrySubscriber,
|
TelemetrySubscriber,
|
||||||
WebsocketDataMsg
|
WebsocketDataMsg
|
||||||
} from '@shared/models/telemetry/telemetry.models';
|
} from '@shared/models/telemetry/telemetry.models';
|
||||||
import { ActionNotificationShow } from '@core/notification/notification.actions';
|
import { ActionNotificationShow } from '@core/notification/notification.actions';
|
||||||
import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models';
|
|
||||||
import Timeout = NodeJS.Timeout;
|
import Timeout = NodeJS.Timeout;
|
||||||
|
|
||||||
const RECONNECT_INTERVAL = 2000;
|
const RECONNECT_INTERVAL = 2000;
|
||||||
|
|||||||
@ -29,7 +29,7 @@ import { distinctUntilChanged, map, share, skip, tap } from 'rxjs/operators';
|
|||||||
import { MatButton } from '@angular/material/button';
|
import { MatButton } from '@angular/material/button';
|
||||||
import { TbPopoverService } from '@shared/components/popover.service';
|
import { TbPopoverService } from '@shared/components/popover.service';
|
||||||
import { ShowNotificationPopoverComponent } from '@home/components/notification/show-notification-popover.component';
|
import { ShowNotificationPopoverComponent } from '@home/components/notification/show-notification-popover.component';
|
||||||
import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models';
|
import { NotificationSubscriber } from '@shared/models/telemetry/telemetry.models';
|
||||||
import { select, Store } from '@ngrx/store';
|
import { select, Store } from '@ngrx/store';
|
||||||
import { selectIsAuthenticated } from '@core/auth/auth.selectors';
|
import { selectIsAuthenticated } from '@core/auth/auth.selectors';
|
||||||
import { AppState } from '@core/core.state';
|
import { AppState } from '@core/core.state';
|
||||||
|
|||||||
@ -24,7 +24,7 @@ import { NotificationWebsocketService } from '@core/ws/notification-websocket.se
|
|||||||
import { BehaviorSubject, Observable, ReplaySubject, Subscription } from 'rxjs';
|
import { BehaviorSubject, Observable, ReplaySubject, Subscription } from 'rxjs';
|
||||||
import { map, share, skip, tap } from 'rxjs/operators';
|
import { map, share, skip, tap } from 'rxjs/operators';
|
||||||
import { Router } from '@angular/router';
|
import { Router } from '@angular/router';
|
||||||
import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models';
|
import { NotificationSubscriber } from '@shared/models/telemetry/telemetry.models';
|
||||||
|
|
||||||
@Component({
|
@Component({
|
||||||
selector: 'tb-show-notification-popover',
|
selector: 'tb-show-notification-popover',
|
||||||
|
|||||||
@ -41,7 +41,6 @@ export * from './limited-api.models';
|
|||||||
export * from './login.models';
|
export * from './login.models';
|
||||||
export * from './material.models';
|
export * from './material.models';
|
||||||
export * from './notification.models';
|
export * from './notification.models';
|
||||||
export * from './websocket/notification-ws.models';
|
|
||||||
export * from './websocket/websocket.models';
|
export * from './websocket/websocket.models';
|
||||||
export * from './oauth2.models';
|
export * from './oauth2.models';
|
||||||
export * from './ota-package.models';
|
export * from './ota-package.models';
|
||||||
|
|||||||
@ -17,7 +17,7 @@
|
|||||||
|
|
||||||
import { EntityType } from '@shared/models/entity-type.models';
|
import { EntityType } from '@shared/models/entity-type.models';
|
||||||
import { AggregationType } from '../time/time.models';
|
import { AggregationType } from '../time/time.models';
|
||||||
import { Observable, ReplaySubject } from 'rxjs';
|
import { BehaviorSubject, Observable, ReplaySubject } from 'rxjs';
|
||||||
import { EntityId } from '@shared/models/id/entity-id';
|
import { EntityId } from '@shared/models/id/entity-id';
|
||||||
import { map } from 'rxjs/operators';
|
import { map } from 'rxjs/operators';
|
||||||
import { NgZone } from '@angular/core';
|
import { NgZone } from '@angular/core';
|
||||||
@ -35,11 +35,11 @@ import {
|
|||||||
import { PageData } from '@shared/models/page/page-data';
|
import { PageData } from '@shared/models/page/page-data';
|
||||||
import { alarmFields } from '@shared/models/alarm.models';
|
import { alarmFields } from '@shared/models/alarm.models';
|
||||||
import { entityFields } from '@shared/models/entity.models';
|
import { entityFields } from '@shared/models/entity.models';
|
||||||
import { isUndefined } from '@core/utils';
|
import { isDefinedAndNotNull, isUndefined } from '@core/utils';
|
||||||
import { CmdWrapper, WsSubscriber } from '@shared/models/websocket/websocket.models';
|
import { CmdWrapper, WsService, WsSubscriber } from '@shared/models/websocket/websocket.models';
|
||||||
import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
|
import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
|
||||||
import { NotificationCountUpdateMsg, NotificationsUpdateMsg } from '@shared/models/websocket/notification-ws.models';
|
|
||||||
import { Notification } from '@shared/models/notification.models';
|
import { Notification } from '@shared/models/notification.models';
|
||||||
|
import { WebsocketService } from '@core/ws/websocket.service';
|
||||||
|
|
||||||
export const NOT_SUPPORTED = 'Not supported!';
|
export const NOT_SUPPORTED = 'Not supported!';
|
||||||
|
|
||||||
@ -275,6 +275,37 @@ export class AlarmCountCmd implements WebsocketCmd {
|
|||||||
type = WsCmdType.ALARM_COUNT;
|
type = WsCmdType.ALARM_COUNT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class UnreadCountSubCmd implements WebsocketCmd {
|
||||||
|
cmdId: number;
|
||||||
|
type = WsCmdType.NOTIFICATIONS_COUNT;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class UnreadSubCmd implements WebsocketCmd {
|
||||||
|
limit: number;
|
||||||
|
cmdId: number;
|
||||||
|
type = WsCmdType.NOTIFICATIONS;
|
||||||
|
|
||||||
|
constructor(limit = 10) {
|
||||||
|
this.limit = limit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class MarkAsReadCmd implements WebsocketCmd {
|
||||||
|
|
||||||
|
cmdId: number;
|
||||||
|
notifications: string[];
|
||||||
|
type = WsCmdType.MARK_NOTIFICATIONS_AS_READ;
|
||||||
|
|
||||||
|
constructor(ids: string[]) {
|
||||||
|
this.notifications = ids;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class MarkAllAsReadCmd implements WebsocketCmd {
|
||||||
|
cmdId: number;
|
||||||
|
type = WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ;
|
||||||
|
}
|
||||||
|
|
||||||
export class EntityDataUnsubscribeCmd implements WebsocketCmd {
|
export class EntityDataUnsubscribeCmd implements WebsocketCmd {
|
||||||
cmdId: number;
|
cmdId: number;
|
||||||
type = WsCmdType.ENTITY_DATA_UNSUBSCRIBE;
|
type = WsCmdType.ENTITY_DATA_UNSUBSCRIBE;
|
||||||
@ -295,6 +326,11 @@ export class AlarmCountUnsubscribeCmd implements WebsocketCmd {
|
|||||||
type = WsCmdType.ALARM_COUNT_UNSUBSCRIBE;
|
type = WsCmdType.ALARM_COUNT_UNSUBSCRIBE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class UnsubscribeCmd implements WebsocketCmd {
|
||||||
|
cmdId: number;
|
||||||
|
type = WsCmdType.NOTIFICATIONS_UNSUBSCRIBE;
|
||||||
|
}
|
||||||
|
|
||||||
export class AuthCmd implements WebsocketCmd {
|
export class AuthCmd implements WebsocketCmd {
|
||||||
cmdId = 0;
|
cmdId = 0;
|
||||||
type: WsCmdType.AUTH;
|
type: WsCmdType.AUTH;
|
||||||
@ -396,6 +432,20 @@ export interface AlarmCountUpdateMsg extends CmdUpdateMsg {
|
|||||||
count: number;
|
count: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface NotificationCountUpdateMsg extends CmdUpdateMsg {
|
||||||
|
cmdUpdateType: CmdUpdateType.NOTIFICATIONS_COUNT;
|
||||||
|
totalUnreadCount: number;
|
||||||
|
sequenceNumber: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface NotificationsUpdateMsg extends CmdUpdateMsg {
|
||||||
|
cmdUpdateType: CmdUpdateType.NOTIFICATIONS;
|
||||||
|
update?: Notification;
|
||||||
|
notifications?: Notification[];
|
||||||
|
totalUnreadCount: number;
|
||||||
|
sequenceNumber: number;
|
||||||
|
}
|
||||||
|
|
||||||
export type WebsocketDataMsg = AlarmDataUpdateMsg | AlarmCountUpdateMsg |
|
export type WebsocketDataMsg = AlarmDataUpdateMsg | AlarmCountUpdateMsg |
|
||||||
EntityDataUpdateMsg | EntityCountUpdateMsg | SubscriptionUpdateMsg | NotificationCountUpdateMsg | NotificationsUpdateMsg;
|
EntityDataUpdateMsg | EntityCountUpdateMsg | SubscriptionUpdateMsg | NotificationCountUpdateMsg | NotificationsUpdateMsg;
|
||||||
|
|
||||||
@ -419,6 +469,16 @@ export const isAlarmCountUpdateMsg = (message: WebsocketDataMsg): message is Ala
|
|||||||
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.ALARM_COUNT_DATA;
|
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.ALARM_COUNT_DATA;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const isNotificationCountUpdateMsg = (message: WebsocketDataMsg): message is NotificationCountUpdateMsg => {
|
||||||
|
const updateMsg = (message as CmdUpdateMsg);
|
||||||
|
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.NOTIFICATIONS_COUNT;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const isNotificationsUpdateMsg = (message: WebsocketDataMsg): message is NotificationsUpdateMsg => {
|
||||||
|
const updateMsg = (message as CmdUpdateMsg);
|
||||||
|
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.NOTIFICATIONS;
|
||||||
|
};
|
||||||
|
|
||||||
export class SubscriptionUpdate implements SubscriptionUpdateMsg {
|
export class SubscriptionUpdate implements SubscriptionUpdateMsg {
|
||||||
subscriptionId: number;
|
subscriptionId: number;
|
||||||
errorCode: number;
|
errorCode: number;
|
||||||
@ -796,3 +856,113 @@ export class TelemetrySubscriber extends WsSubscriber {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class NotificationSubscriber extends WsSubscriber {
|
||||||
|
private notificationCountSubject = new BehaviorSubject<NotificationCountUpdate>({
|
||||||
|
cmdId: 0,
|
||||||
|
cmdUpdateType: undefined,
|
||||||
|
errorCode: 0,
|
||||||
|
errorMsg: '',
|
||||||
|
totalUnreadCount: 0,
|
||||||
|
sequenceNumber: 0
|
||||||
|
});
|
||||||
|
private notificationsSubject = new BehaviorSubject<NotificationsUpdate>({
|
||||||
|
cmdId: 0,
|
||||||
|
cmdUpdateType: undefined,
|
||||||
|
errorCode: 0,
|
||||||
|
errorMsg: '',
|
||||||
|
notifications: null,
|
||||||
|
totalUnreadCount: 0,
|
||||||
|
sequenceNumber: 0
|
||||||
|
});
|
||||||
|
|
||||||
|
public messageLimit = 10;
|
||||||
|
|
||||||
|
public notificationCount$ = this.notificationCountSubject.asObservable().pipe(map(msg => msg.totalUnreadCount));
|
||||||
|
public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg.notifications ));
|
||||||
|
|
||||||
|
public static createNotificationCountSubscription(websocketService: WebsocketService<WsSubscriber>,
|
||||||
|
zone: NgZone): NotificationSubscriber {
|
||||||
|
const subscriptionCommand = new UnreadCountSubCmd();
|
||||||
|
const subscriber = new NotificationSubscriber(websocketService, zone);
|
||||||
|
subscriber.subscriptionCommands.push(subscriptionCommand);
|
||||||
|
return subscriber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static createNotificationsSubscription(websocketService: WebsocketService<WsSubscriber>,
|
||||||
|
zone: NgZone, limit = 10): NotificationSubscriber {
|
||||||
|
const subscriptionCommand = new UnreadSubCmd(limit);
|
||||||
|
const subscriber = new NotificationSubscriber(websocketService, zone);
|
||||||
|
subscriber.messageLimit = limit;
|
||||||
|
subscriber.subscriptionCommands.push(subscriptionCommand);
|
||||||
|
return subscriber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static createMarkAsReadCommand(websocketService: WebsocketService<WsSubscriber>,
|
||||||
|
ids: string[]): NotificationSubscriber {
|
||||||
|
const subscriptionCommand = new MarkAsReadCmd(ids);
|
||||||
|
const subscriber = new NotificationSubscriber(websocketService);
|
||||||
|
subscriber.subscriptionCommands.push(subscriptionCommand);
|
||||||
|
return subscriber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static createMarkAllAsReadCommand(websocketService: WebsocketService<WsSubscriber>): NotificationSubscriber {
|
||||||
|
const subscriptionCommand = new MarkAllAsReadCmd();
|
||||||
|
const subscriber = new NotificationSubscriber(websocketService);
|
||||||
|
subscriber.subscriptionCommands.push(subscriptionCommand);
|
||||||
|
return subscriber;
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(private websocketService: WsService<any>, protected zone?: NgZone) {
|
||||||
|
super(websocketService, zone);
|
||||||
|
}
|
||||||
|
|
||||||
|
onNotificationCountUpdate(message: NotificationCountUpdate) {
|
||||||
|
const currentNotificationCount = this.notificationCountSubject.value;
|
||||||
|
if (message.sequenceNumber <= currentNotificationCount.sequenceNumber) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (this.zone) {
|
||||||
|
this.zone.run(
|
||||||
|
() => {
|
||||||
|
this.notificationCountSubject.next(message);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.notificationCountSubject.next(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public complete() {
|
||||||
|
this.notificationCountSubject.complete();
|
||||||
|
this.notificationsSubject.complete();
|
||||||
|
super.complete();
|
||||||
|
}
|
||||||
|
|
||||||
|
onNotificationsUpdate(message: NotificationsUpdate) {
|
||||||
|
const currentNotifications = this.notificationsSubject.value;
|
||||||
|
if (message.sequenceNumber <= currentNotifications.sequenceNumber) {
|
||||||
|
message.totalUnreadCount = currentNotifications.totalUnreadCount;
|
||||||
|
}
|
||||||
|
let processMessage = message;
|
||||||
|
if (isDefinedAndNotNull(currentNotifications) && message.update) {
|
||||||
|
currentNotifications.notifications.unshift(message.update);
|
||||||
|
if (currentNotifications.notifications.length > this.messageLimit) {
|
||||||
|
currentNotifications.notifications.pop();
|
||||||
|
}
|
||||||
|
processMessage = currentNotifications;
|
||||||
|
processMessage.totalUnreadCount = message.totalUnreadCount;
|
||||||
|
}
|
||||||
|
if (this.zone) {
|
||||||
|
this.zone.run(
|
||||||
|
() => {
|
||||||
|
this.notificationsSubject.next(processMessage);
|
||||||
|
this.notificationCountSubject.next(processMessage);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.notificationsSubject.next(processMessage);
|
||||||
|
this.notificationCountSubject.next(processMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1,202 +0,0 @@
|
|||||||
///
|
|
||||||
/// Copyright © 2016-2023 The Thingsboard Authors
|
|
||||||
///
|
|
||||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
/// you may not use this file except in compliance with the License.
|
|
||||||
/// You may obtain a copy of the License at
|
|
||||||
///
|
|
||||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
///
|
|
||||||
/// Unless required by applicable law or agreed to in writing, software
|
|
||||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
/// See the License for the specific language governing permissions and
|
|
||||||
/// limitations under the License.
|
|
||||||
///
|
|
||||||
|
|
||||||
import {
|
|
||||||
CmdUpdateMsg,
|
|
||||||
CmdUpdateType,
|
|
||||||
NotificationCountUpdate,
|
|
||||||
NotificationsUpdate,
|
|
||||||
WebsocketCmd,
|
|
||||||
WebsocketDataMsg,
|
|
||||||
WsCmdType
|
|
||||||
} from '@shared/models/telemetry/telemetry.models';
|
|
||||||
import { NgZone } from '@angular/core';
|
|
||||||
import { isDefinedAndNotNull } from '@core/utils';
|
|
||||||
import { Notification } from '@shared/models/notification.models';
|
|
||||||
import { WsService, WsSubscriber } from '@shared/models/websocket/websocket.models';
|
|
||||||
import { BehaviorSubject } from 'rxjs';
|
|
||||||
import { map } from 'rxjs/operators';
|
|
||||||
import { WebsocketService } from '@core/ws/websocket.service';
|
|
||||||
|
|
||||||
export class NotificationSubscriber extends WsSubscriber {
|
|
||||||
private notificationCountSubject = new BehaviorSubject<NotificationCountUpdate>({
|
|
||||||
cmdId: 0,
|
|
||||||
cmdUpdateType: undefined,
|
|
||||||
errorCode: 0,
|
|
||||||
errorMsg: '',
|
|
||||||
totalUnreadCount: 0,
|
|
||||||
sequenceNumber: 0
|
|
||||||
});
|
|
||||||
private notificationsSubject = new BehaviorSubject<NotificationsUpdate>({
|
|
||||||
cmdId: 0,
|
|
||||||
cmdUpdateType: undefined,
|
|
||||||
errorCode: 0,
|
|
||||||
errorMsg: '',
|
|
||||||
notifications: null,
|
|
||||||
totalUnreadCount: 0,
|
|
||||||
sequenceNumber: 0
|
|
||||||
});
|
|
||||||
|
|
||||||
public messageLimit = 10;
|
|
||||||
|
|
||||||
public notificationCount$ = this.notificationCountSubject.asObservable().pipe(map(msg => msg.totalUnreadCount));
|
|
||||||
public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg.notifications ));
|
|
||||||
|
|
||||||
public static createNotificationCountSubscription(websocketService: WebsocketService<WsSubscriber>,
|
|
||||||
zone: NgZone): NotificationSubscriber {
|
|
||||||
const subscriptionCommand = new UnreadCountSubCmd();
|
|
||||||
const subscriber = new NotificationSubscriber(websocketService, zone);
|
|
||||||
subscriber.subscriptionCommands.push(subscriptionCommand);
|
|
||||||
return subscriber;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static createNotificationsSubscription(websocketService: WebsocketService<WsSubscriber>,
|
|
||||||
zone: NgZone, limit = 10): NotificationSubscriber {
|
|
||||||
const subscriptionCommand = new UnreadSubCmd(limit);
|
|
||||||
const subscriber = new NotificationSubscriber(websocketService, zone);
|
|
||||||
subscriber.messageLimit = limit;
|
|
||||||
subscriber.subscriptionCommands.push(subscriptionCommand);
|
|
||||||
return subscriber;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static createMarkAsReadCommand(websocketService: WebsocketService<WsSubscriber>,
|
|
||||||
ids: string[]): NotificationSubscriber {
|
|
||||||
const subscriptionCommand = new MarkAsReadCmd(ids);
|
|
||||||
const subscriber = new NotificationSubscriber(websocketService);
|
|
||||||
subscriber.subscriptionCommands.push(subscriptionCommand);
|
|
||||||
return subscriber;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static createMarkAllAsReadCommand(websocketService: WebsocketService<WsSubscriber>): NotificationSubscriber {
|
|
||||||
const subscriptionCommand = new MarkAllAsReadCmd();
|
|
||||||
const subscriber = new NotificationSubscriber(websocketService);
|
|
||||||
subscriber.subscriptionCommands.push(subscriptionCommand);
|
|
||||||
return subscriber;
|
|
||||||
}
|
|
||||||
|
|
||||||
constructor(private websocketService: WsService<any>, protected zone?: NgZone) {
|
|
||||||
super(websocketService, zone);
|
|
||||||
}
|
|
||||||
|
|
||||||
onNotificationCountUpdate(message: NotificationCountUpdate) {
|
|
||||||
const currentNotificationCount = this.notificationCountSubject.value;
|
|
||||||
if (message.sequenceNumber <= currentNotificationCount.sequenceNumber) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.zone) {
|
|
||||||
this.zone.run(
|
|
||||||
() => {
|
|
||||||
this.notificationCountSubject.next(message);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
this.notificationCountSubject.next(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public complete() {
|
|
||||||
this.notificationCountSubject.complete();
|
|
||||||
this.notificationsSubject.complete();
|
|
||||||
super.complete();
|
|
||||||
}
|
|
||||||
|
|
||||||
onNotificationsUpdate(message: NotificationsUpdate) {
|
|
||||||
const currentNotifications = this.notificationsSubject.value;
|
|
||||||
if (message.sequenceNumber <= currentNotifications.sequenceNumber) {
|
|
||||||
message.totalUnreadCount = currentNotifications.totalUnreadCount;
|
|
||||||
}
|
|
||||||
let processMessage = message;
|
|
||||||
if (isDefinedAndNotNull(currentNotifications) && message.update) {
|
|
||||||
currentNotifications.notifications.unshift(message.update);
|
|
||||||
if (currentNotifications.notifications.length > this.messageLimit) {
|
|
||||||
currentNotifications.notifications.pop();
|
|
||||||
}
|
|
||||||
processMessage = currentNotifications;
|
|
||||||
processMessage.totalUnreadCount = message.totalUnreadCount;
|
|
||||||
}
|
|
||||||
if (this.zone) {
|
|
||||||
this.zone.run(
|
|
||||||
() => {
|
|
||||||
this.notificationsSubject.next(processMessage);
|
|
||||||
this.notificationCountSubject.next(processMessage);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
this.notificationsSubject.next(processMessage);
|
|
||||||
this.notificationCountSubject.next(processMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export class UnreadCountSubCmd implements WebsocketCmd {
|
|
||||||
cmdId: number;
|
|
||||||
type = WsCmdType.NOTIFICATIONS_COUNT;
|
|
||||||
}
|
|
||||||
|
|
||||||
export class UnreadSubCmd implements WebsocketCmd {
|
|
||||||
limit: number;
|
|
||||||
cmdId: number;
|
|
||||||
type = WsCmdType.NOTIFICATIONS;
|
|
||||||
|
|
||||||
constructor(limit = 10) {
|
|
||||||
this.limit = limit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export class UnsubscribeCmd implements WebsocketCmd {
|
|
||||||
cmdId: number;
|
|
||||||
type = WsCmdType.NOTIFICATIONS_UNSUBSCRIBE;
|
|
||||||
}
|
|
||||||
|
|
||||||
export class MarkAsReadCmd implements WebsocketCmd {
|
|
||||||
|
|
||||||
cmdId: number;
|
|
||||||
notifications: string[];
|
|
||||||
type = WsCmdType.MARK_NOTIFICATIONS_AS_READ;
|
|
||||||
|
|
||||||
constructor(ids: string[]) {
|
|
||||||
this.notifications = ids;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export class MarkAllAsReadCmd implements WebsocketCmd {
|
|
||||||
cmdId: number;
|
|
||||||
type = WsCmdType.MARK_ALL_NOTIFICATIONS_AS_READ;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface NotificationCountUpdateMsg extends CmdUpdateMsg {
|
|
||||||
cmdUpdateType: CmdUpdateType.NOTIFICATIONS_COUNT;
|
|
||||||
totalUnreadCount: number;
|
|
||||||
sequenceNumber: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface NotificationsUpdateMsg extends CmdUpdateMsg {
|
|
||||||
cmdUpdateType: CmdUpdateType.NOTIFICATIONS;
|
|
||||||
update?: Notification;
|
|
||||||
notifications?: Notification[];
|
|
||||||
totalUnreadCount: number;
|
|
||||||
sequenceNumber: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const isNotificationCountUpdateMsg = (message: WebsocketDataMsg): message is NotificationCountUpdateMsg => {
|
|
||||||
const updateMsg = (message as CmdUpdateMsg);
|
|
||||||
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.NOTIFICATIONS_COUNT;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const isNotificationsUpdateMsg = (message: WebsocketDataMsg): message is NotificationsUpdateMsg => {
|
|
||||||
const updateMsg = (message as CmdUpdateMsg);
|
|
||||||
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.NOTIFICATIONS;
|
|
||||||
};
|
|
||||||
Loading…
x
Reference in New Issue
Block a user