/* * Copyright © 2016-2017 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import 'angular-websocket'; import thingsboardTypes from '../common/types.constant'; export default angular.module('thingsboard.api.telemetryWebsocket', [thingsboardTypes]) .factory('telemetryWebsocketService', TelemetryWebsocketService) .name; const RECONNECT_INTERVAL = 2000; const WS_IDLE_TIMEOUT = 90000; /*@ngInject*/ function TelemetryWebsocketService($rootScope, $websocket, $timeout, $window, types, userService) { var isOpening = false, isOpened = false, isActive = false, isReconnect = false, reconnectSubscribers = [], lastCmdId = 0, subscribers = {}, subscribersCount = 0, cmdsWrapper = { tsSubCmds: [], historyCmds: [], attrSubCmds: [] }, telemetryUri, dataStream, location = $window.location, socketCloseTimer, reconnectTimer; if (location.protocol === "https:") { telemetryUri = "wss:"; } else { telemetryUri = "ws:"; } telemetryUri += "//" + location.hostname + ":" + location.port; telemetryUri += "/api/ws/plugins/telemetry"; var service = { subscribe: subscribe, unsubscribe: unsubscribe } $rootScope.telemetryWsLogoutHandle = $rootScope.$on('unauthenticated', function (event, doLogout) { if (doLogout) { reset(true); } }); return service; function publishCommands () { if (isOpened && (cmdsWrapper.tsSubCmds.length > 0 || cmdsWrapper.historyCmds.length > 0 || cmdsWrapper.attrSubCmds.length > 0)) { dataStream.send(angular.copy(cmdsWrapper)).then(function () { checkToClose(); }); cmdsWrapper.tsSubCmds = []; cmdsWrapper.historyCmds = []; cmdsWrapper.attrSubCmds = []; } tryOpenSocket(); } function onError (/*message*/) { isOpening = false; } function onOpen () { isOpening = false; isOpened = true; if (reconnectTimer) { $timeout.cancel(reconnectTimer); reconnectTimer = null; } if (isReconnect) { isReconnect = false; for (var r=0; r 0) { return command.keys.split(","); } else { return []; } } function nextCmdId () { lastCmdId++; return lastCmdId; } function subscribe (subscriber) { isActive = true; var cmdId = nextCmdId(); subscribers[cmdId] = subscriber; subscribersCount++; if (angular.isDefined(subscriber.subscriptionCommand)) { subscriber.subscriptionCommand.cmdId = cmdId; if (subscriber.type === types.dataKeyType.timeseries) { cmdsWrapper.tsSubCmds.push(subscriber.subscriptionCommand); } else if (subscriber.type === types.dataKeyType.attribute) { cmdsWrapper.attrSubCmds.push(subscriber.subscriptionCommand); } } else if (angular.isDefined(subscriber.historyCommand)) { subscriber.historyCommand.cmdId = cmdId; cmdsWrapper.historyCmds.push(subscriber.historyCommand); } publishCommands(); } function unsubscribe (subscriber) { if (isActive) { var cmdId = null; if (subscriber.subscriptionCommand) { subscriber.subscriptionCommand.unsubscribe = true; if (subscriber.type === types.dataKeyType.timeseries) { cmdsWrapper.tsSubCmds.push(subscriber.subscriptionCommand); } else if (subscriber.type === types.dataKeyType.attribute) { cmdsWrapper.attrSubCmds.push(subscriber.subscriptionCommand); } cmdId = subscriber.subscriptionCommand.cmdId; } else if (subscriber.historyCommand) { cmdId = subscriber.historyCommand.cmdId; } if (cmdId && subscribers[cmdId]) { delete subscribers[cmdId]; subscribersCount--; } publishCommands(); } } function checkToClose () { if (subscribersCount === 0 && isOpened) { if (!socketCloseTimer) { socketCloseTimer = $timeout(closeSocket, WS_IDLE_TIMEOUT, false); } } } function tryOpenSocket () { if (isActive) { if (!isOpened && !isOpening) { isOpening = true; if (userService.isJwtTokenValid()) { openSocket(userService.getJwtToken()); } else { userService.refreshJwtToken().then(function success() { openSocket(userService.getJwtToken()); }, function fail() { isOpening = false; $rootScope.$broadcast('unauthenticated'); }); } } if (socketCloseTimer) { $timeout.cancel(socketCloseTimer); socketCloseTimer = null; } } } function openSocket(token) { dataStream = $websocket(telemetryUri + '?token=' + token); dataStream.onError(onError); dataStream.onOpen(onOpen); dataStream.onClose(onClose); dataStream.onMessage(onMessage, {autoApply: false}); } function closeSocket() { isActive = false; if (isOpened) { dataStream.close(); } } function reset(close) { if (socketCloseTimer) { $timeout.cancel(socketCloseTimer); socketCloseTimer = null; } lastCmdId = 0; subscribers = {}; subscribersCount = 0; cmdsWrapper.tsSubCmds = []; cmdsWrapper.historyCmds = []; cmdsWrapper.attrSubCmds = []; if (close) { closeSocket(); } } }