UI: Refactoring web socket

This commit is contained in:
Vladyslav_Prykhodko 2023-02-16 12:59:43 +02:00
parent eddb06b536
commit 8589047fcc
12 changed files with 460 additions and 596 deletions

View File

@ -17,7 +17,6 @@
import {
AlarmDataCmd,
DataKeyType,
TelemetryService,
TelemetrySubscriber
} from '@shared/models/telemetry/telemetry.models';
import { DatasourceType } from '@shared/models/widget.models';
@ -34,6 +33,7 @@ import { AlarmDataListener } from '@core/api/alarm-data.service';
import { PageData } from '@shared/models/page/page-data';
import { deepClone, isDefined, isDefinedAndNotNull, isObject } from '@core/utils';
import { simulatedAlarm } from '@shared/models/alarm.models';
import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
export interface AlarmSubscriptionDataKey {
name: string;
@ -68,7 +68,7 @@ export class AlarmDataSubscription {
private subsTw: SubscriptionTimewindow;
constructor(private listener: AlarmDataListener,
private telemetryService: TelemetryService) {
private telemetryService: TelemetryWebsocketService) {
}
public unsubscribe() {

View File

@ -41,7 +41,6 @@ import {
EntityDataCmd,
IndexedSubscriptionData,
SubscriptionData,
TelemetryService,
TelemetrySubscriber
} from '@shared/models/telemetry/telemetry.models';
import { UtilsService } from '@core/services/utils.service';
@ -53,6 +52,7 @@ import { NULL_UUID } from '@shared/models/id/has-uuid';
import { EntityType } from '@shared/models/entity-type.models';
import { Observable, of, ReplaySubject, Subject } from 'rxjs';
import { EntityId } from '@shared/models/id/entity-id';
import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
import Timeout = NodeJS.Timeout;
declare type DataKeyFunction = (time: number, prevValue: any) => any;
@ -96,7 +96,7 @@ export interface EntityDataSubscriptionOptions {
export class EntityDataSubscription {
constructor(private listener: EntityDataListener,
private telemetryService: TelemetryService,
private telemetryService: TelemetryWebsocketService,
private utils: UtilsService) {
this.initializeSubscription();
}

View File

@ -15,81 +15,41 @@
///
import { Inject, Injectable, NgZone } from '@angular/core';
import { select, Store } from '@ngrx/store';
import { Store } from '@ngrx/store';
import { AppState } from '@core/core.state';
import { AuthService } from '@core/auth/auth.service';
import { selectIsAuthenticated } from '@core/auth/auth.selectors';
import { WINDOW } from '@core/services/window.service';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { ActionNotificationShow } from '@core/notification/notification.actions';
import {
isNotificationCountUpdateMsg,
isNotificationsUpdateMsg,
MarkAllAsReadCmd,
MarkAsReadCmd,
NotificationCountUpdate,
NotificationPluginCmdsWrapper,
NotificationPluginCmdWrapper,
NotificationSubscriber,
NotificationsUpdate,
NotificationWsService,
UnreadCountSubCmd,
UnreadSubCmd,
UnsubscribeCmd,
WebsocketNotificationMsg
} from '@shared/models/notification-ws.models';
import Timeout = NodeJS.Timeout;
} from '@shared/models/websocket/notification-ws.models';
import { WebsocketService } from '@core/ws/websocket.service';
const RECONNECT_INTERVAL = 2000;
const WS_IDLE_TIMEOUT = 90000;
// @dynamic
@Injectable({
providedIn: 'root'
})
export class NotificationWebsocketService implements NotificationWsService {
export class NotificationWebsocketService extends WebsocketService<NotificationSubscriber> {
isActive = false;
isOpening = false;
isOpened = false;
isReconnect = false;
cmdWrapper: NotificationPluginCmdWrapper;
socketCloseTimer: Timeout;
reconnectTimer: Timeout;
lastCmdId = 0;
subscribersCount = 0;
subscribersMap = new Map<number, NotificationSubscriber>();
reconnectSubscribers = new Set<NotificationSubscriber>();
cmdsWrapper = new NotificationPluginCmdsWrapper();
notificationUri: string;
dataStream: WebSocketSubject<NotificationPluginCmdsWrapper | WebsocketNotificationMsg>;
constructor(private store: Store<AppState>,
private authService: AuthService,
private ngZone: NgZone,
@Inject(WINDOW) private window: Window) {
this.store.pipe(select(selectIsAuthenticated)).subscribe(
() => {
this.reset(true);
}
);
let port = this.window.location.port;
if (this.window.location.protocol === 'https:') {
if (!port) {
port = '443';
}
this.notificationUri = 'wss:';
} else {
if (!port) {
port = '80';
}
this.notificationUri = 'ws:';
}
this.notificationUri += `//${this.window.location.hostname}:${port}/api/ws/plugins/notifications`;
constructor(protected store: Store<AppState>,
protected authService: AuthService,
protected ngZone: NgZone,
@Inject(WINDOW) protected window: Window) {
super(store, authService, ngZone, 'api/ws/plugins/notifications', new NotificationPluginCmdWrapper(), window);
this.errorName = 'WebSocket Notification Error';
}
public subscribe(subscriber: NotificationSubscriber) {
@ -100,19 +60,19 @@ export class NotificationWebsocketService implements NotificationWsService {
this.subscribersMap.set(cmdId, subscriber);
subscriptionCommand.cmdId = cmdId;
if (subscriptionCommand instanceof UnreadCountSubCmd) {
this.cmdsWrapper.unreadCountSubCmd = subscriptionCommand;
this.cmdWrapper.unreadCountSubCmd = subscriptionCommand;
} else if (subscriptionCommand instanceof UnreadSubCmd) {
this.cmdsWrapper.unreadSubCmd = subscriptionCommand;
this.cmdWrapper.unreadSubCmd = subscriptionCommand;
} else if (subscriptionCommand instanceof MarkAsReadCmd) {
this.cmdsWrapper.markAsReadCmd = subscriptionCommand;
this.cmdWrapper.markAsReadCmd = subscriptionCommand;
this.subscribersMap.delete(cmdId);
} else if (subscriptionCommand instanceof MarkAllAsReadCmd) {
this.cmdsWrapper.markAllAsReadCmd = subscriptionCommand;
this.cmdWrapper.markAllAsReadCmd = subscriptionCommand;
this.subscribersMap.delete(cmdId);
}
}
);
if (this.cmdsWrapper.markAsReadCmd || this.cmdsWrapper.markAllAsReadCmd) {
if (this.cmdWrapper.markAsReadCmd || this.cmdWrapper.markAllAsReadCmd) {
this.subscribersCount++;
}
this.publishCommands();
@ -123,7 +83,7 @@ export class NotificationWebsocketService implements NotificationWsService {
subscriber.subscriptionCommands.forEach(
(subscriptionCommand) => {
if (subscriptionCommand.cmdId && subscriptionCommand instanceof UnreadSubCmd) {
this.cmdsWrapper.unreadSubCmd = subscriptionCommand;
this.cmdWrapper.unreadSubCmd = subscriptionCommand;
}
}
);
@ -139,7 +99,7 @@ export class NotificationWebsocketService implements NotificationWsService {
|| subscriptionCommand instanceof UnreadSubCmd) {
const unreadCountUnsubscribeCmd = new UnsubscribeCmd();
unreadCountUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdsWrapper.unsubCmd = unreadCountUnsubscribeCmd;
this.cmdWrapper.unsubCmd = unreadCountUnsubscribeCmd;
}
const cmdId = subscriptionCommand.cmdId;
if (cmdId) {
@ -153,184 +113,19 @@ export class NotificationWebsocketService implements NotificationWsService {
}
}
private nextCmdId(): number {
this.lastCmdId++;
return this.lastCmdId;
}
private publishCommands() {
while (this.isOpened && this.cmdsWrapper.hasCommands()) {
this.dataStream.next(this.cmdsWrapper.preparePublishCommands());
this.cmdsWrapper.clear();
this.checkToClose();
}
this.tryOpenSocket();
}
private checkToClose() {
if (this.subscribersCount === 0 && this.isOpened) {
if (!this.socketCloseTimer) {
this.socketCloseTimer = setTimeout(
() => this.closeSocket(), WS_IDLE_TIMEOUT);
processOnMessage(message: WebsocketNotificationMsg) {
let subscriber: NotificationSubscriber;
if (isNotificationCountUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message));
}
} else if (isNotificationsUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onNotificationsUpdate(new NotificationsUpdate(message));
}
}
}
private reset(close: boolean) {
if (this.socketCloseTimer) {
clearTimeout(this.socketCloseTimer);
this.socketCloseTimer = null;
}
this.lastCmdId = 0;
this.subscribersMap.clear();
this.subscribersCount = 0;
this.cmdsWrapper.clear();
if (close) {
this.closeSocket();
}
}
private closeSocket() {
this.isActive = false;
if (this.isOpened) {
this.dataStream.unsubscribe();
}
}
private tryOpenSocket() {
if (this.isActive) {
if (!this.isOpened && !this.isOpening) {
this.isOpening = true;
if (AuthService.isJwtTokenValid()) {
this.openSocket(AuthService.getJwtToken());
} else {
this.authService.refreshJwtToken().subscribe(() => {
this.openSocket(AuthService.getJwtToken());
},
() => {
this.isOpening = false;
this.authService.logout(true, true);
}
);
}
}
if (this.socketCloseTimer) {
clearTimeout(this.socketCloseTimer);
this.socketCloseTimer = null;
}
}
}
private openSocket(token: string) {
const uri = `${this.notificationUri}?token=${token}`;
this.dataStream = webSocket(
{
url: uri,
openObserver: {
next: () => {
this.onOpen();
}
},
closeObserver: {
next: (e: CloseEvent) => {
this.onClose(e);
}
}
}
);
this.dataStream.subscribe((message) => {
this.ngZone.runOutsideAngular(() => {
this.onMessage(message as WebsocketNotificationMsg);
});
},
(error) => {
this.onError(error);
});
}
private onOpen() {
this.isOpening = false;
this.isOpened = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.isReconnect) {
this.isReconnect = false;
this.reconnectSubscribers.forEach(
(reconnectSubscriber) => {
reconnectSubscriber.onReconnected();
this.subscribe(reconnectSubscriber);
}
);
this.reconnectSubscribers.clear();
} else {
this.publishCommands();
}
}
private onMessage(message: WebsocketNotificationMsg) {
if (message.errorCode) {
this.showWsError(message.errorCode, message.errorMsg);
} else {
let subscriber: NotificationSubscriber;
if (isNotificationCountUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onNotificationCountUpdate(new NotificationCountUpdate(message));
}
} else if (isNotificationsUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onNotificationsUpdate(new NotificationsUpdate(message));
}
}
}
this.checkToClose();
}
private onError(errorEvent) {
if (errorEvent) {
console.warn('WebSocket error event', errorEvent);
}
this.isOpening = false;
}
private onClose(closeEvent: CloseEvent) {
if (closeEvent && closeEvent.code > 1001 && closeEvent.code !== 1006
&& closeEvent.code !== 1011 && closeEvent.code !== 1012 && closeEvent.code !== 4500) {
this.showWsError(closeEvent.code, closeEvent.reason);
}
this.isOpening = false;
this.isOpened = false;
if (this.isActive) {
if (!this.isReconnect) {
this.reconnectSubscribers.clear();
this.subscribersMap.forEach(
(subscriber) => {
this.reconnectSubscribers.add(subscriber);
}
);
this.reset(false);
this.isReconnect = true;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL);
}
}
private showWsError(errorCode: number, errorMsg: string) {
let message = errorMsg;
if (!message) {
message += `WebSocket Notification Error: error code - ${errorCode}.`;
}
this.store.dispatch(new ActionNotificationShow(
{
message, type: 'error'
}));
}
}

View File

@ -20,7 +20,8 @@ import {
AlarmDataUnsubscribeCmd,
AlarmDataUpdate,
AttributesSubscriptionCmd,
EntityCountCmd, EntityCountUnsubscribeCmd,
EntityCountCmd,
EntityCountUnsubscribeCmd,
EntityCountUpdate,
EntityDataCmd,
EntityDataUnsubscribeCmd,
@ -33,72 +34,29 @@ import {
SubscriptionUpdate,
TelemetryFeature,
TelemetryPluginCmdsWrapper,
TelemetryService,
TelemetrySubscriber,
TimeseriesSubscriptionCmd,
WebsocketDataMsg
} from '@app/shared/models/telemetry/telemetry.models';
import { select, Store } from '@ngrx/store';
import { Store } from '@ngrx/store';
import { AppState } from '@core/core.state';
import { AuthService } from '@core/auth/auth.service';
import { selectIsAuthenticated } from '@core/auth/auth.selectors';
import { WINDOW } from '@core/services/window.service';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { ActionNotificationShow } from '@core/notification/notification.actions';
import Timeout = NodeJS.Timeout;
const RECONNECT_INTERVAL = 2000;
const WS_IDLE_TIMEOUT = 90000;
const MAX_PUBLISH_COMMANDS = 10;
import { WebsocketService } from '@core/ws/websocket.service';
// @dynamic
@Injectable({
providedIn: 'root'
})
export class TelemetryWebsocketService implements TelemetryService {
export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscriber> {
isActive = false;
isOpening = false;
isOpened = false;
isReconnect = false;
cmdWrapper: TelemetryPluginCmdsWrapper;
socketCloseTimer: Timeout;
reconnectTimer: Timeout;
lastCmdId = 0;
subscribersCount = 0;
subscribersMap = new Map<number, TelemetrySubscriber>();
reconnectSubscribers = new Set<TelemetrySubscriber>();
cmdsWrapper = new TelemetryPluginCmdsWrapper();
telemetryUri: string;
dataStream: WebSocketSubject<TelemetryPluginCmdsWrapper | WebsocketDataMsg>;
constructor(private store: Store<AppState>,
private authService: AuthService,
private ngZone: NgZone,
@Inject(WINDOW) private window: Window) {
this.store.pipe(select(selectIsAuthenticated)).subscribe(
() => {
this.reset(true);
}
);
let port = this.window.location.port;
if (this.window.location.protocol === 'https:') {
if (!port) {
port = '443';
}
this.telemetryUri = 'wss:';
} else {
if (!port) {
port = '80';
}
this.telemetryUri = 'ws:';
}
this.telemetryUri += `//${this.window.location.hostname}:${port}/api/ws/plugins/telemetry`;
constructor(protected store: Store<AppState>,
protected authService: AuthService,
protected ngZone: NgZone,
@Inject(WINDOW) protected window: Window) {
super(store, authService, ngZone, 'api/ws/plugins/telemetry', new TelemetryPluginCmdsWrapper(), window);
}
public subscribe(subscriber: TelemetrySubscriber) {
@ -110,18 +68,18 @@ export class TelemetryWebsocketService implements TelemetryService {
subscriptionCommand.cmdId = cmdId;
if (subscriptionCommand instanceof SubscriptionCmd) {
if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES) {
this.cmdsWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd);
this.cmdWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd);
} else {
this.cmdsWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd);
this.cmdWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd);
}
} else if (subscriptionCommand instanceof GetHistoryCmd) {
this.cmdsWrapper.historyCmds.push(subscriptionCommand);
this.cmdWrapper.historyCmds.push(subscriptionCommand);
} else if (subscriptionCommand instanceof EntityDataCmd) {
this.cmdsWrapper.entityDataCmds.push(subscriptionCommand);
this.cmdWrapper.entityDataCmds.push(subscriptionCommand);
} else if (subscriptionCommand instanceof AlarmDataCmd) {
this.cmdsWrapper.alarmDataCmds.push(subscriptionCommand);
this.cmdWrapper.alarmDataCmds.push(subscriptionCommand);
} else if (subscriptionCommand instanceof EntityCountCmd) {
this.cmdsWrapper.entityCountCmds.push(subscriptionCommand);
this.cmdWrapper.entityCountCmds.push(subscriptionCommand);
}
}
);
@ -134,7 +92,7 @@ export class TelemetryWebsocketService implements TelemetryService {
subscriber.subscriptionCommands.forEach(
(subscriptionCommand) => {
if (subscriptionCommand.cmdId && subscriptionCommand instanceof EntityDataCmd) {
this.cmdsWrapper.entityDataCmds.push(subscriptionCommand);
this.cmdWrapper.entityDataCmds.push(subscriptionCommand);
}
}
);
@ -149,22 +107,22 @@ export class TelemetryWebsocketService implements TelemetryService {
if (subscriptionCommand instanceof SubscriptionCmd) {
subscriptionCommand.unsubscribe = true;
if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES) {
this.cmdsWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd);
this.cmdWrapper.tsSubCmds.push(subscriptionCommand as TimeseriesSubscriptionCmd);
} else {
this.cmdsWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd);
this.cmdWrapper.attrSubCmds.push(subscriptionCommand as AttributesSubscriptionCmd);
}
} else if (subscriptionCommand instanceof EntityDataCmd) {
const entityDataUnsubscribeCmd = new EntityDataUnsubscribeCmd();
entityDataUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdsWrapper.entityDataUnsubscribeCmds.push(entityDataUnsubscribeCmd);
this.cmdWrapper.entityDataUnsubscribeCmds.push(entityDataUnsubscribeCmd);
} else if (subscriptionCommand instanceof AlarmDataCmd) {
const alarmDataUnsubscribeCmd = new AlarmDataUnsubscribeCmd();
alarmDataUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdsWrapper.alarmDataUnsubscribeCmds.push(alarmDataUnsubscribeCmd);
this.cmdWrapper.alarmDataUnsubscribeCmds.push(alarmDataUnsubscribeCmd);
} else if (subscriptionCommand instanceof EntityCountCmd) {
const entityCountUnsubscribeCmd = new EntityCountUnsubscribeCmd();
entityCountUnsubscribeCmd.cmdId = subscriptionCommand.cmdId;
this.cmdsWrapper.entityCountUnsubscribeCmds.push(entityCountUnsubscribeCmd);
this.cmdWrapper.entityCountUnsubscribeCmds.push(entityCountUnsubscribeCmd);
}
const cmdId = subscriptionCommand.cmdId;
if (cmdId) {
@ -178,193 +136,29 @@ export class TelemetryWebsocketService implements TelemetryService {
}
}
private nextCmdId(): number {
this.lastCmdId++;
return this.lastCmdId;
}
private publishCommands() {
while (this.isOpened && this.cmdsWrapper.hasCommands()) {
this.dataStream.next(this.cmdsWrapper.preparePublishCommands(MAX_PUBLISH_COMMANDS));
this.checkToClose();
}
this.tryOpenSocket();
}
private checkToClose() {
if (this.subscribersCount === 0 && this.isOpened) {
if (!this.socketCloseTimer) {
this.socketCloseTimer = setTimeout(
() => this.closeSocket(), WS_IDLE_TIMEOUT);
processOnMessage(message: WebsocketDataMsg) {
let subscriber: TelemetrySubscriber;
if (isEntityDataUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onEntityData(new EntityDataUpdate(message));
}
} else if (isAlarmDataUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onAlarmData(new AlarmDataUpdate(message));
}
} else if (isEntityCountUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onEntityCount(new EntityCountUpdate(message));
}
} else if (message.subscriptionId) {
subscriber = this.subscribersMap.get(message.subscriptionId);
if (subscriber) {
subscriber.onData(new SubscriptionUpdate(message));
}
}
}
private reset(close: boolean) {
if (this.socketCloseTimer) {
clearTimeout(this.socketCloseTimer);
this.socketCloseTimer = null;
}
this.lastCmdId = 0;
this.subscribersMap.clear();
this.subscribersCount = 0;
this.cmdsWrapper.clear();
if (close) {
this.closeSocket();
}
}
private closeSocket() {
this.isActive = false;
if (this.isOpened) {
this.dataStream.unsubscribe();
}
}
private tryOpenSocket() {
if (this.isActive) {
if (!this.isOpened && !this.isOpening) {
this.isOpening = true;
if (AuthService.isJwtTokenValid()) {
this.openSocket(AuthService.getJwtToken());
} else {
this.authService.refreshJwtToken().subscribe(() => {
this.openSocket(AuthService.getJwtToken());
},
() => {
this.isOpening = false;
this.authService.logout(true, true);
}
);
}
}
if (this.socketCloseTimer) {
clearTimeout(this.socketCloseTimer);
this.socketCloseTimer = null;
}
}
}
private openSocket(token: string) {
const uri = `${this.telemetryUri}?token=${token}`;
this.dataStream = webSocket(
{
url: uri,
openObserver: {
next: () => {
this.onOpen();
}
},
closeObserver: {
next: (e: CloseEvent) => {
this.onClose(e);
}
}
}
);
this.dataStream.subscribe((message) => {
this.ngZone.runOutsideAngular(() => {
this.onMessage(message as WebsocketDataMsg);
});
},
(error) => {
this.onError(error);
});
}
private onOpen() {
this.isOpening = false;
this.isOpened = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.isReconnect) {
this.isReconnect = false;
this.reconnectSubscribers.forEach(
(reconnectSubscriber) => {
reconnectSubscriber.onReconnected();
this.subscribe(reconnectSubscriber);
}
);
this.reconnectSubscribers.clear();
} else {
this.publishCommands();
}
}
private onMessage(message: WebsocketDataMsg) {
if (message.errorCode) {
this.showWsError(message.errorCode, message.errorMsg);
} else {
let subscriber: TelemetrySubscriber;
if (isEntityDataUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onEntityData(new EntityDataUpdate(message));
}
} else if (isAlarmDataUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onAlarmData(new AlarmDataUpdate(message));
}
} else if (isEntityCountUpdateMsg(message)) {
subscriber = this.subscribersMap.get(message.cmdId);
if (subscriber) {
subscriber.onEntityCount(new EntityCountUpdate(message));
}
} else if (message.subscriptionId) {
subscriber = this.subscribersMap.get(message.subscriptionId);
if (subscriber) {
subscriber.onData(new SubscriptionUpdate(message));
}
}
}
this.checkToClose();
}
private onError(errorEvent) {
if (errorEvent) {
console.warn('WebSocket error event', errorEvent);
}
this.isOpening = false;
}
private onClose(closeEvent: CloseEvent) {
if (closeEvent && closeEvent.code > 1001 && closeEvent.code !== 1006
&& closeEvent.code !== 1011 && closeEvent.code !== 1012 && closeEvent.code !== 4500) {
this.showWsError(closeEvent.code, closeEvent.reason);
}
this.isOpening = false;
this.isOpened = false;
if (this.isActive) {
if (!this.isReconnect) {
this.reconnectSubscribers.clear();
this.subscribersMap.forEach(
(subscriber) => {
this.reconnectSubscribers.add(subscriber);
}
);
this.reset(false);
this.isReconnect = true;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL);
}
}
private showWsError(errorCode: number, errorMsg: string) {
let message = errorMsg;
if (!message) {
message += `WebSocket Error: error code - ${errorCode}.`;
}
this.store.dispatch(new ActionNotificationShow(
{
message, type: 'error'
}));
}
}

View File

@ -0,0 +1,245 @@
import { CmdWrapper, WsService, WsSubscriber } from '@shared/models/websocket/websocket.models';
import { select, Store } from '@ngrx/store';
import { AppState } from '@core/core.state';
import { AuthService } from '@core/auth/auth.service';
import { NgZone } from '@angular/core';
import { selectIsAuthenticated } from '@core/auth/auth.selectors';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { WebsocketNotificationMsg } from '@shared/models/websocket/notification-ws.models';
import { CmdUpdateMsg } from '@shared/models/telemetry/telemetry.models';
import { ActionNotificationShow } from '@core/notification/notification.actions';
import Timeout = NodeJS.Timeout;
const RECONNECT_INTERVAL = 2000;
const WS_IDLE_TIMEOUT = 90000;
const MAX_PUBLISH_COMMANDS = 10;
export abstract class WebsocketService<T extends WsSubscriber> implements WsService<T> {
isActive = false;
isOpening = false;
isOpened = false;
isReconnect = false;
socketCloseTimer: Timeout;
reconnectTimer: Timeout;
lastCmdId = 0;
subscribersCount = 0;
subscribersMap = new Map<number, T>();
reconnectSubscribers = new Set<T>();
notificationUri: string;
dataStream: WebSocketSubject<CmdWrapper | CmdUpdateMsg>;
errorName = 'WebSocket Error';
protected constructor(protected store: Store<AppState>,
protected authService: AuthService,
protected ngZone: NgZone,
protected apiEndpoint: string,
protected cmdWrapper: CmdWrapper,
protected window: Window) {
this.store.pipe(select(selectIsAuthenticated)).subscribe(
() => {
this.reset(true);
}
);
let port = this.window.location.port;
if (this.window.location.protocol === 'https:') {
if (!port) {
port = '443';
}
this.notificationUri = 'wss:';
} else {
if (!port) {
port = '80';
}
this.notificationUri = 'ws:';
}
this.notificationUri += `//${this.window.location.hostname}:${port}/${apiEndpoint}`;
}
abstract subscribe(subscriber: T);
abstract update(subscriber: T);
abstract unsubscribe(subscriber: T);
abstract processOnMessage(message: any);
protected nextCmdId(): number {
this.lastCmdId++;
return this.lastCmdId;
}
protected publishCommands() {
while (this.isOpened && this.cmdWrapper.hasCommands()) {
this.dataStream.next(this.cmdWrapper.preparePublishCommands(MAX_PUBLISH_COMMANDS));
this.checkToClose();
}
this.tryOpenSocket();
}
private checkToClose() {
if (this.subscribersCount === 0 && this.isOpened) {
if (!this.socketCloseTimer) {
this.socketCloseTimer = setTimeout(
() => this.closeSocket(), WS_IDLE_TIMEOUT);
}
}
}
private reset(close: boolean) {
if (this.socketCloseTimer) {
clearTimeout(this.socketCloseTimer);
this.socketCloseTimer = null;
}
this.lastCmdId = 0;
this.subscribersMap.clear();
this.subscribersCount = 0;
this.cmdWrapper.clear();
if (close) {
this.closeSocket();
}
}
private closeSocket() {
this.isActive = false;
if (this.isOpened) {
this.dataStream.unsubscribe();
}
}
private tryOpenSocket() {
if (this.isActive) {
if (!this.isOpened && !this.isOpening) {
this.isOpening = true;
if (AuthService.isJwtTokenValid()) {
this.openSocket(AuthService.getJwtToken());
} else {
this.authService.refreshJwtToken().subscribe({
next: () => {
this.openSocket(AuthService.getJwtToken());
},
error: () => {
this.isOpening = false;
this.authService.logout(true, true);
}
});
}
}
if (this.socketCloseTimer) {
clearTimeout(this.socketCloseTimer);
this.socketCloseTimer = null;
}
}
}
private openSocket(token: string) {
const uri = `${this.notificationUri}?token=${token}`;
this.dataStream = webSocket(
{
url: uri,
openObserver: {
next: () => {
this.onOpen();
}
},
closeObserver: {
next: (e: CloseEvent) => {
this.onClose(e);
}
}
}
);
this.dataStream.subscribe({
next: (message) => {
this.ngZone.runOutsideAngular(() => {
this.onMessage(message as WebsocketNotificationMsg);
});
},
error: (error) => {
this.onError(error);
}
});
}
private onOpen() {
this.isOpening = false;
this.isOpened = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.isReconnect) {
this.isReconnect = false;
this.reconnectSubscribers.forEach(
(reconnectSubscriber) => {
reconnectSubscriber.onReconnected();
this.subscribe(reconnectSubscriber);
}
);
this.reconnectSubscribers.clear();
} else {
this.publishCommands();
}
}
private onMessage(message: WebsocketNotificationMsg) {
if (message.errorCode) {
this.showWsError(message.errorCode, message.errorMsg);
} else {
this.processOnMessage(message);
}
this.checkToClose();
}
private onError(errorEvent) {
if (errorEvent) {
console.warn('WebSocket error event', errorEvent);
}
this.isOpening = false;
}
private onClose(closeEvent: CloseEvent) {
if (closeEvent && closeEvent.code > 1001 && closeEvent.code !== 1006
&& closeEvent.code !== 1011 && closeEvent.code !== 1012 && closeEvent.code !== 4500) {
this.showWsError(closeEvent.code, closeEvent.reason);
}
this.isOpening = false;
this.isOpened = false;
if (this.isActive) {
if (!this.isReconnect) {
this.reconnectSubscribers.clear();
this.subscribersMap.forEach(
(subscriber) => {
this.reconnectSubscribers.add(subscriber);
}
);
this.reset(false);
this.isReconnect = true;
}
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = setTimeout(() => this.tryOpenSocket(), RECONNECT_INTERVAL);
}
}
private showWsError(errorCode: number, errorMsg: string) {
let message = errorMsg;
if (!message) {
message += `${this.errorName}: error code - ${errorCode}.`;
}
this.store.dispatch(new ActionNotificationShow(
{
message, type: 'error'
}));
}
}

View File

@ -19,27 +19,36 @@ import {
ChangeDetectorRef,
Component,
NgZone,
OnInit,
OnDestroy,
Renderer2,
ViewContainerRef
} from '@angular/core';
import { NotificationWebsocketService } from '@core/ws/notification-websocket.service';
import { Observable } from 'rxjs';
import { distinctUntilChanged, publishReplay, refCount, tap } from 'rxjs/operators';
import { BehaviorSubject, ReplaySubject, Subscription } from 'rxjs';
import { distinctUntilChanged, share, tap } from 'rxjs/operators';
import { MatButton } from '@angular/material/button';
import { TbPopoverService } from '@shared/components/popover.service';
import { ShowNotificationPopoverComponent } from '@home/components/notification/show-notification-popover.component';
import { NotificationSubscriber } from '@shared/models/notification-ws.models';
import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models';
@Component({
selector: 'tb-notification-bell',
templateUrl: './notification-bell.component.html',
changeDetection: ChangeDetectionStrategy.OnPush
})
export class NotificationBellComponent implements OnInit {
export class NotificationBellComponent implements OnDestroy {
private notificationSubscriber: NotificationSubscriber;
count$: Observable<number>;
private notificationCountSubscriber: Subscription;
private countSubject = new BehaviorSubject(0);
count$ = this.countSubject.asObservable().pipe(
distinctUntilChanged(),
tap(() => setTimeout(() => this.cd.markForCheck())),
share({
connector: () => new ReplaySubject(1)
})
);
constructor(
private notificationWsService: NotificationWebsocketService,
@ -48,25 +57,18 @@ export class NotificationBellComponent implements OnInit {
private popoverService: TbPopoverService,
private renderer: Renderer2,
private viewContainerRef: ViewContainerRef) {
this.initSubscription();
}
ngOnInit() {
this.notificationSubscriber = NotificationSubscriber.createNotificationCountSubscription(
this.notificationWsService, this.zone);
this.notificationSubscriber.subscribe();
this.count$ = this.notificationSubscriber.notificationCount$.pipe(
distinctUntilChanged(),
publishReplay(1),
refCount(),
tap(() => setTimeout(() => this.cd.markForCheck())),
);
ngOnDestroy() {
this.unsubscribeSubscription();
}
showNotification($event: Event, createVersionButton: MatButton) {
if ($event) {
$event.stopPropagation();
}
this.unsubscribeSubscription();
const trigger = createVersionButton._elementRef.nativeElement;
if (this.popoverService.hasPopover(trigger)) {
this.popoverService.hidePopover(trigger);
@ -76,7 +78,9 @@ export class NotificationBellComponent implements OnInit {
{
onClose: () => {
showNotificationPopover.hide();
}
this.initSubscription();
},
counter: this.countSubject
},
{maxHeight: '90vh', height: '100%', padding: '10px'},
{width: '400px', minWidth: '100%', maxWidth: '100%'},
@ -84,4 +88,16 @@ export class NotificationBellComponent implements OnInit {
showNotificationPopover.tbComponentRef.instance.popoverComponent = showNotificationPopover;
}
}
private initSubscription() {
this.notificationSubscriber = NotificationSubscriber.createNotificationCountSubscription(this.notificationWsService, this.zone);
this.notificationCountSubscriber = this.notificationSubscriber.notificationCount$.subscribe(value => this.countSubject.next(value));
this.notificationSubscriber.subscribe();
}
private unsubscribeSubscription() {
this.notificationCountSubscriber.unsubscribe();
this.notificationSubscriber.unsubscribe();
}
}

View File

@ -26,7 +26,7 @@
<mat-divider></mat-divider>
<div *ngIf="(notifications$ | async).length; else emptyNotification" style="overflow: auto">
<section style="min-height: 100px; overflow: auto; padding: 10px 0;">
<div *ngFor="let notification of (notifications$ | async); let last = last">
<div *ngFor="let notification of (notifications$ | async); let last = last; trackBy: trackById">
<tb-notification [notification]="notification"
[onClose]="onClose"
(markAsRead)="markAsRead($event)">

View File

@ -19,12 +19,12 @@ import { PageComponent } from '@shared/components/page.component';
import { TbPopoverComponent } from '@shared/components/popover.component';
import { Store } from '@ngrx/store';
import { AppState } from '@core/core.state';
import { Notification } from '@shared/models/notification.models';
import { Notification, NotificationRequest } from '@shared/models/notification.models';
import { NotificationWebsocketService } from '@core/ws/notification-websocket.service';
import { Observable } from 'rxjs';
import { publishReplay, refCount, tap } from 'rxjs/operators';
import { BehaviorSubject, Observable, ReplaySubject, Subscription } from 'rxjs';
import { share, tap } from 'rxjs/operators';
import { Router } from '@angular/router';
import { NotificationSubscriber } from '@shared/models/notification-ws.models';
import { NotificationSubscriber } from '@shared/models/websocket/notification-ws.models';
@Component({
selector: 'tb-show-notification-popover',
@ -36,10 +36,15 @@ export class ShowNotificationPopoverComponent extends PageComponent implements O
@Input()
onClose: () => void;
@Input()
counter: BehaviorSubject<number>;
@Input()
popoverComponent: TbPopoverComponent;
private notificationSubscriber: NotificationSubscriber;
private notificationCountSubscriber: Subscription;
notifications$: Observable<Notification[]>;
constructor(protected store: Store<AppState>,
@ -51,19 +56,22 @@ export class ShowNotificationPopoverComponent extends PageComponent implements O
}
ngOnInit() {
this.notificationSubscriber = NotificationSubscriber.createNotificationsSubscription(
this.notificationWsService, this.zone);
this.notificationSubscriber = NotificationSubscriber.createNotificationsSubscription(this.notificationWsService, this.zone);
this.notifications$ = this.notificationSubscriber.notifications$.pipe(
publishReplay(1),
refCount(),
share({
connector: () => new ReplaySubject(1)
}),
tap(() => setTimeout(() => this.cd.markForCheck()))
);
this.notificationCountSubscriber = this.notificationSubscriber.notificationCount$.subscribe(value => this.counter.next(value));
this.notificationSubscriber.subscribe();
}
ngOnDestroy() {
super.ngOnDestroy();
this.notificationCountSubscriber.unsubscribe();
this.notificationSubscriber.unsubscribe();
this.onClose();
}
markAsRead(id: string) {
@ -86,4 +94,8 @@ export class ShowNotificationPopoverComponent extends PageComponent implements O
this.onClose();
this.router.navigateByUrl(this.router.createUrlTree(['notification-center'])).then(() => {});
}
trackById (index: number, item: NotificationRequest): string {
return item.id.id;
}
}

View File

@ -39,7 +39,8 @@ export * from './event.models';
export * from './login.models';
export * from './material.models';
export * from './notification.models';
export * from './notification-ws.models';
export * from './websocket/notification-ws.models';
export * from './websocket/websocket.models';
export * from './oauth2.models';
export * from './queue.models';
export * from './relation.models';

View File

@ -17,7 +17,7 @@
import { EntityType } from '@shared/models/entity-type.models';
import { AggregationType } from '../time/time.models';
import { Observable, ReplaySubject, Subject } from 'rxjs';
import { Observable, ReplaySubject } from 'rxjs';
import { EntityId } from '@shared/models/id/entity-id';
import { map } from 'rxjs/operators';
import { NgZone } from '@angular/core';
@ -33,6 +33,8 @@ import { PageData } from '@shared/models/page/page-data';
import { alarmFields } from '@shared/models/alarm.models';
import { entityFields } from '@shared/models/entity.models';
import { isUndefined } from '@core/utils';
import { CmdWrapper, WsSubscriber } from '@shared/models/websocket/websocket.models';
import { TelemetryWebsocketService } from '@core/ws/telemetry-websocket.service';
export enum DataKeyType {
timeseries = 'timeseries',
@ -234,7 +236,7 @@ export class AlarmDataUnsubscribeCmd implements WebsocketCmd {
cmdId: number;
}
export class TelemetryPluginCmdsWrapper {
export class TelemetryPluginCmdsWrapper implements CmdWrapper {
constructor() {
this.attrSubCmds = [];
@ -564,31 +566,20 @@ export class EntityCountUpdate extends CmdUpdate {
}
}
export interface TelemetryService {
subscribe(subscriber: TelemetrySubscriber);
update(subscriber: TelemetrySubscriber);
unsubscribe(subscriber: TelemetrySubscriber);
}
export class TelemetrySubscriber {
export class TelemetrySubscriber extends WsSubscriber {
private dataSubject = new ReplaySubject<SubscriptionUpdate>(1);
private entityDataSubject = new ReplaySubject<EntityDataUpdate>(1);
private alarmDataSubject = new ReplaySubject<AlarmDataUpdate>(1);
private entityCountSubject = new ReplaySubject<EntityCountUpdate>(1);
private reconnectSubject = new Subject<void>();
private tsOffset = undefined;
public subscriptionCommands: Array<WebsocketCmd>;
public data$ = this.dataSubject.asObservable();
public entityData$ = this.entityDataSubject.asObservable();
public alarmData$ = this.alarmDataSubject.asObservable();
public entityCount$ = this.entityCountSubject.asObservable();
public reconnect$ = this.reconnectSubject.asObservable();
public static createEntityAttributesSubscription(telemetryService: TelemetryService,
public static createEntityAttributesSubscription(telemetryService: TelemetryWebsocketService,
entityId: EntityId, attributeScope: TelemetryType,
zone: NgZone, keys: string[] = null): TelemetrySubscriber {
let subscriptionCommand: SubscriptionCmd;
@ -608,21 +599,8 @@ export class TelemetrySubscriber {
return subscriber;
}
constructor(private telemetryService: TelemetryService, private zone?: NgZone) {
this.subscriptionCommands = [];
}
public subscribe() {
this.telemetryService.subscribe(this);
}
public update() {
this.telemetryService.update(this);
}
public unsubscribe() {
this.telemetryService.unsubscribe(this);
this.complete();
constructor(private telemetryService: TelemetryWebsocketService, protected zone?: NgZone) {
super(telemetryService, zone);
}
public complete() {
@ -630,7 +608,7 @@ export class TelemetrySubscriber {
this.entityDataSubject.complete();
this.alarmDataSubject.complete();
this.entityCountSubject.complete();
this.reconnectSubject.complete();
super.complete();
}
public setTsOffset(tsOffset: number): boolean {
@ -707,10 +685,6 @@ export class TelemetrySubscriber {
}
}
public onReconnected() {
this.reconnectSubject.next();
}
public attributeData$(): Observable<Array<AttributeData>> {
const attributeData = new Array<AttributeData>();
return this.data$.pipe(

View File

@ -14,12 +14,14 @@
/// limitations under the License.
///
import { BehaviorSubject, ReplaySubject, Subject } from 'rxjs';
import { BehaviorSubject, ReplaySubject } from 'rxjs';
import { CmdUpdate, CmdUpdateMsg, CmdUpdateType, WebsocketCmd } from '@shared/models/telemetry/telemetry.models';
import { first, map } from 'rxjs/operators';
import { NgZone } from '@angular/core';
import { isDefinedAndNotNull } from '@core/utils';
import { Notification } from '@shared/models/notification.models';
import { CmdWrapper, WsSubscriber } from '@shared/models/websocket/websocket.models';
import { NotificationWebsocketService } from '@core/ws/notification-websocket.service';
export class NotificationCountUpdate extends CmdUpdate {
totalUnreadCount: number;
@ -43,26 +45,16 @@ export class NotificationsUpdate extends CmdUpdate {
}
}
export interface NotificationWsService {
subscribe(subscriber: NotificationSubscriber);
update(subscriber: NotificationSubscriber);
unsubscribe(subscriber: NotificationSubscriber);
}
export class NotificationSubscriber {
export class NotificationSubscriber extends WsSubscriber {
private notificationCountSubject = new ReplaySubject<NotificationCountUpdate>(1);
private notificationsSubject = new BehaviorSubject<NotificationsUpdate>(null);
private reconnectSubject = new Subject<void>();
public subscriptionCommands: Array<WebsocketCmd>;
public messageLimit = 10;
public notificationCount$ = this.notificationCountSubject.asObservable().pipe(map(msg => msg.totalUnreadCount));
public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg?.notifications || []));
public static createNotificationCountSubscription(notificationWsService: NotificationWsService,
public static createNotificationCountSubscription(notificationWsService: NotificationWebsocketService,
zone: NgZone): NotificationSubscriber {
const subscriptionCommand = new UnreadCountSubCmd();
const subscriber = new NotificationSubscriber(notificationWsService, zone);
@ -70,7 +62,7 @@ export class NotificationSubscriber {
return subscriber;
}
public static createNotificationsSubscription(notificationWsService: NotificationWsService,
public static createNotificationsSubscription(notificationWsService: NotificationWebsocketService,
zone: NgZone, limit = 10): NotificationSubscriber {
const subscriptionCommand = new UnreadSubCmd(limit);
const subscriber = new NotificationSubscriber(notificationWsService, zone);
@ -79,7 +71,7 @@ export class NotificationSubscriber {
return subscriber;
}
public static createMarkAsReadCommand(notificationWsService: NotificationWsService,
public static createMarkAsReadCommand(notificationWsService: NotificationWebsocketService,
ids: string[]): NotificationSubscriber {
const subscriptionCommand = new MarkAsReadCmd(ids);
const subscriber = new NotificationSubscriber(notificationWsService);
@ -87,38 +79,15 @@ export class NotificationSubscriber {
return subscriber;
}
public static createMarkAllAsReadCommand(notificationWsService: NotificationWsService): NotificationSubscriber {
public static createMarkAllAsReadCommand(notificationWsService: NotificationWebsocketService): NotificationSubscriber {
const subscriptionCommand = new MarkAllAsReadCmd();
const subscriber = new NotificationSubscriber(notificationWsService);
subscriber.subscriptionCommands.push(subscriptionCommand);
return subscriber;
}
constructor(private notificationWsService: NotificationWsService, private zone?: NgZone) {
this.subscriptionCommands = [];
}
public subscribe() {
this.notificationWsService.subscribe(this);
}
public update() {
this.notificationWsService.update(this);
}
public unsubscribe() {
this.notificationWsService.unsubscribe(this);
this.complete();
}
public onReconnected() {
this.reconnectSubject.next();
}
public complete() {
this.notificationCountSubject.complete();
this.notificationsSubject.complete();
this.reconnectSubject.complete();
constructor(private notificationWsService: NotificationWebsocketService, protected zone?: NgZone) {
super(notificationWsService, zone);
}
onNotificationCountUpdate(message: NotificationCountUpdate) {
@ -133,6 +102,12 @@ export class NotificationSubscriber {
}
}
public complete() {
this.notificationCountSubject.complete();
this.notificationsSubject.complete();
super.complete();
}
onNotificationsUpdate(message: NotificationsUpdate) {
this.notificationsSubject.asObservable().pipe(
first()
@ -221,7 +196,7 @@ export function isNotificationsUpdateMsg(message: WebsocketNotificationMsg): mes
return updateMsg.cmdId !== undefined && updateMsg.cmdUpdateType === CmdUpdateType.NOTIFICATIONS;
}
export class NotificationPluginCmdsWrapper {
export class NotificationPluginCmdWrapper implements CmdWrapper {
constructor() {
this.unreadCountSubCmd = null;
@ -253,13 +228,14 @@ export class NotificationPluginCmdsWrapper {
this.markAllAsReadCmd = null;
}
public preparePublishCommands(): NotificationPluginCmdsWrapper {
const preparedWrapper = new NotificationPluginCmdsWrapper();
public preparePublishCommands(): NotificationPluginCmdWrapper {
const preparedWrapper = new NotificationPluginCmdWrapper();
preparedWrapper.unreadCountSubCmd = this.unreadCountSubCmd || undefined;
preparedWrapper.unreadSubCmd = this.unreadSubCmd || undefined;
preparedWrapper.unsubCmd = this.unsubCmd || undefined;
preparedWrapper.markAsReadCmd = this.markAsReadCmd || undefined;
preparedWrapper.markAllAsReadCmd = this.markAllAsReadCmd || undefined;
this.clear();
return preparedWrapper;
}
}

View File

@ -0,0 +1,51 @@
import { NgZone } from '@angular/core';
import { WebsocketCmd } from '@shared/models/telemetry/telemetry.models';
import { Subject } from 'rxjs';
export interface WsService<T extends WsSubscriber> {
subscribe(subscriber: T);
update(subscriber: T);
unsubscribe(subscriber: T);
}
export abstract class CmdWrapper {
abstract hasCommands(): boolean;
abstract clear(): void;
abstract preparePublishCommands(maxCommands: number): CmdWrapper;
[key: string]: WebsocketCmd | any;
}
export abstract class WsSubscriber {
protected reconnectSubject = new Subject<void>();
subscriptionCommands: Array<WebsocketCmd>;
reconnect$ = this.reconnectSubject.asObservable();
protected constructor(protected wsService: WsService<WsSubscriber>, protected zone?: NgZone) {
this.subscriptionCommands = [];
}
public subscribe() {
this.wsService.subscribe(this);
}
public update() {
this.wsService.update(this);
}
public unsubscribe() {
this.wsService.unsubscribe(this);
this.complete();
}
public complete() {
this.reconnectSubject.complete();
}
public onReconnected() {
this.reconnectSubject.next();
}
}