241 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
/*
 | 
						|
 * Copyright © 2016-2017 The Thingsboard Authors
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
import 'angular-websocket';
 | 
						|
import thingsboardTypes from '../common/types.constant';
 | 
						|
 | 
						|
export default angular.module('thingsboard.api.telemetryWebsocket', [thingsboardTypes])
 | 
						|
    .factory('telemetryWebsocketService', TelemetryWebsocketService)
 | 
						|
    .name;
 | 
						|
 | 
						|
const RECONNECT_INTERVAL = 5000;
 | 
						|
const WS_IDLE_TIMEOUT = 90000;
 | 
						|
 | 
						|
/*@ngInject*/
 | 
						|
function TelemetryWebsocketService($rootScope, $websocket, $timeout, $window, types, userService) {
 | 
						|
 | 
						|
    var isOpening = false,
 | 
						|
        isOpened = false,
 | 
						|
        isActive = false,
 | 
						|
        isReconnect = false,
 | 
						|
        reconnectSubscribers = [],
 | 
						|
        lastCmdId = 0,
 | 
						|
        subscribers = {},
 | 
						|
        subscribersCount = 0,
 | 
						|
        cmdsWrapper = {
 | 
						|
            tsSubCmds: [],
 | 
						|
            historyCmds: [],
 | 
						|
            attrSubCmds: []
 | 
						|
        },
 | 
						|
        telemetryUri,
 | 
						|
        dataStream,
 | 
						|
        location = $window.location,
 | 
						|
        socketCloseTimer,
 | 
						|
        reconnectTimer;
 | 
						|
 | 
						|
    if (location.protocol === "https:") {
 | 
						|
        telemetryUri = "wss:";
 | 
						|
    } else {
 | 
						|
        telemetryUri = "ws:";
 | 
						|
    }
 | 
						|
    telemetryUri += "//" + location.hostname + ":" + location.port;
 | 
						|
    telemetryUri += "/api/ws/plugins/telemetry";
 | 
						|
 | 
						|
 | 
						|
    var service = {
 | 
						|
        subscribe: subscribe,
 | 
						|
        unsubscribe: unsubscribe
 | 
						|
    }
 | 
						|
 | 
						|
    $rootScope.telemetryWsLogoutHandle = $rootScope.$on('unauthenticated', function (event, doLogout) {
 | 
						|
        if (doLogout) {
 | 
						|
            reset(true);
 | 
						|
        }
 | 
						|
    });
 | 
						|
 | 
						|
    return service;
 | 
						|
 | 
						|
    function publishCommands () {
 | 
						|
        if (isOpened && (cmdsWrapper.tsSubCmds.length > 0 ||
 | 
						|
            cmdsWrapper.historyCmds.length > 0 ||
 | 
						|
            cmdsWrapper.attrSubCmds.length > 0)) {
 | 
						|
            dataStream.send(angular.copy(cmdsWrapper)).then(function () {
 | 
						|
                checkToClose();
 | 
						|
            });
 | 
						|
            cmdsWrapper.tsSubCmds = [];
 | 
						|
            cmdsWrapper.historyCmds = [];
 | 
						|
            cmdsWrapper.attrSubCmds = [];
 | 
						|
        }
 | 
						|
        tryOpenSocket();
 | 
						|
    }
 | 
						|
 | 
						|
    function onError (/*message*/) {
 | 
						|
        isOpening = false;
 | 
						|
    }
 | 
						|
 | 
						|
    function onOpen () {
 | 
						|
        isOpening = false;
 | 
						|
        isOpened = true;
 | 
						|
        if (reconnectTimer) {
 | 
						|
            $timeout.cancel(reconnectTimer);
 | 
						|
            reconnectTimer = null;
 | 
						|
        }
 | 
						|
        if (isReconnect) {
 | 
						|
            isReconnect = false;
 | 
						|
            for (var r in reconnectSubscribers) {
 | 
						|
                var reconnectSubscriber = reconnectSubscribers[r];
 | 
						|
                if (reconnectSubscriber.onReconnected) {
 | 
						|
                    reconnectSubscriber.onReconnected();
 | 
						|
                }
 | 
						|
                subscribe(reconnectSubscriber);
 | 
						|
            }
 | 
						|
            reconnectSubscribers = [];
 | 
						|
        } else {
 | 
						|
            publishCommands();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    function onClose () {
 | 
						|
        isOpening = false;
 | 
						|
        isOpened = false;
 | 
						|
        if (isActive) {
 | 
						|
            if (!isReconnect) {
 | 
						|
                reconnectSubscribers = [];
 | 
						|
                for (var id in subscribers) {
 | 
						|
                    reconnectSubscribers.push(subscribers[id]);
 | 
						|
                }
 | 
						|
                reset(false);
 | 
						|
                isReconnect = true;
 | 
						|
            }
 | 
						|
            if (reconnectTimer) {
 | 
						|
                $timeout.cancel(reconnectTimer);
 | 
						|
            }
 | 
						|
            reconnectTimer = $timeout(tryOpenSocket, RECONNECT_INTERVAL, false);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    function onMessage (message) {
 | 
						|
        if (message.data) {
 | 
						|
            var data = angular.fromJson(message.data);
 | 
						|
            if (data.subscriptionId) {
 | 
						|
                var subscriber = subscribers[data.subscriptionId];
 | 
						|
                if (subscriber && data.data) {
 | 
						|
                    subscriber.onData(data.data);
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
        checkToClose();
 | 
						|
    }
 | 
						|
 | 
						|
    function nextCmdId () {
 | 
						|
        lastCmdId++;
 | 
						|
        return lastCmdId;
 | 
						|
    }
 | 
						|
 | 
						|
    function subscribe (subscriber) {
 | 
						|
        var cmdId = nextCmdId();
 | 
						|
        subscribers[cmdId] = subscriber;
 | 
						|
        subscribersCount++;
 | 
						|
        if (angular.isDefined(subscriber.subscriptionCommand)) {
 | 
						|
            subscriber.subscriptionCommand.cmdId = cmdId;
 | 
						|
            if (subscriber.type === types.dataKeyType.timeseries) {
 | 
						|
                cmdsWrapper.tsSubCmds.push(subscriber.subscriptionCommand);
 | 
						|
            } else if (subscriber.type === types.dataKeyType.attribute) {
 | 
						|
                cmdsWrapper.attrSubCmds.push(subscriber.subscriptionCommand);
 | 
						|
            }
 | 
						|
        } else if (angular.isDefined(subscriber.historyCommand)) {
 | 
						|
            subscriber.historyCommand.cmdId = cmdId;
 | 
						|
            cmdsWrapper.historyCmds.push(subscriber.historyCommand);
 | 
						|
        }
 | 
						|
        publishCommands();
 | 
						|
    }
 | 
						|
 | 
						|
    function unsubscribe (subscriber) {
 | 
						|
        if (subscriber.subscriptionCommand) {
 | 
						|
            subscriber.subscriptionCommand.unsubscribe = true;
 | 
						|
            if (subscriber.type === types.dataKeyType.timeseries) {
 | 
						|
                cmdsWrapper.tsSubCmds.push(subscriber.subscriptionCommand);
 | 
						|
            } else if (subscriber.type === types.dataKeyType.attribute) {
 | 
						|
                cmdsWrapper.attrSubCmds.push(subscriber.subscriptionCommand);
 | 
						|
            }
 | 
						|
            delete subscribers[subscriber.subscriptionCommand.cmdId];
 | 
						|
        } else if (subscriber.historyCommand) {
 | 
						|
            delete subscribers[subscriber.historyCommand.cmdId];
 | 
						|
        }
 | 
						|
        subscribersCount--;
 | 
						|
        publishCommands();
 | 
						|
    }
 | 
						|
 | 
						|
    function checkToClose () {
 | 
						|
        if (subscribersCount === 0 && isOpened) {
 | 
						|
            if (!socketCloseTimer) {
 | 
						|
                socketCloseTimer = $timeout(closeSocket, WS_IDLE_TIMEOUT, false);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    function tryOpenSocket () {
 | 
						|
        isActive = true;
 | 
						|
        if (!isOpened && !isOpening) {
 | 
						|
            isOpening = true;
 | 
						|
            if (userService.isJwtTokenValid()) {
 | 
						|
                openSocket(userService.getJwtToken());
 | 
						|
            } else {
 | 
						|
                userService.refreshJwtToken().then(function success() {
 | 
						|
                    openSocket(userService.getJwtToken());
 | 
						|
                }, function fail() {
 | 
						|
                    isOpening = false;
 | 
						|
                    $rootScope.$broadcast('unauthenticated');
 | 
						|
                });
 | 
						|
            }
 | 
						|
        }
 | 
						|
        if (socketCloseTimer) {
 | 
						|
            $timeout.cancel(socketCloseTimer);
 | 
						|
            socketCloseTimer = null;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    function openSocket(token) {
 | 
						|
        dataStream = $websocket(telemetryUri + '?token=' + token);
 | 
						|
        dataStream.onError(onError);
 | 
						|
        dataStream.onOpen(onOpen);
 | 
						|
        dataStream.onClose(onClose);
 | 
						|
        dataStream.onMessage(onMessage);
 | 
						|
    }
 | 
						|
 | 
						|
    function closeSocket() {
 | 
						|
        isActive = false;
 | 
						|
        if (isOpened) {
 | 
						|
            dataStream.close();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    function reset(closeSocket) {
 | 
						|
        if (socketCloseTimer) {
 | 
						|
            $timeout.cancel(socketCloseTimer);
 | 
						|
            socketCloseTimer = null;
 | 
						|
        }
 | 
						|
        lastCmdId = 0;
 | 
						|
        subscribers = {};
 | 
						|
        subscribersCount = 0;
 | 
						|
        cmdsWrapper.tsSubCmds = [];
 | 
						|
        cmdsWrapper.historyCmds = [];
 | 
						|
        cmdsWrapper.attrSubCmds = [];
 | 
						|
        if (closeSocket) {
 | 
						|
            closeSocket();
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 |