/* * Copyright © 2016-2020 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import 'angular-websocket'; import thingsboardTypes from '../common/types.constant'; export default angular.module('thingsboard.api.telemetryWebsocket', [thingsboardTypes]) .factory('telemetryWebsocketService', TelemetryWebsocketService) .name; const RECONNECT_INTERVAL = 2000; const WS_IDLE_TIMEOUT = 90000; const MAX_PUBLISH_COMMANDS = 10; /*@ngInject*/ function TelemetryWebsocketService($rootScope, $websocket, $timeout, $window, $mdUtil, $log, toast, types, userService) { var isOpening = false, isOpened = false, isActive = false, isReconnect = false, reconnectSubscribers = [], lastCmdId = 0, subscribers = {}, subscribersCount = 0, commands = {}, cmdsWrapper = { tsSubCmds: [], historyCmds: [], attrSubCmds: [] }, telemetryUri, dataStream, location = $window.location, socketCloseTimer, reconnectTimer; var port = location.port; if (location.protocol === "https:") { if (!port) { port = "443"; } telemetryUri = "wss:"; } else { if (!port) { port = "80"; } telemetryUri = "ws:"; } telemetryUri += "//" + location.hostname + ":" + port; telemetryUri += "/api/ws/plugins/telemetry"; var service = { subscribe: subscribe, unsubscribe: unsubscribe } $rootScope.telemetryWsLogoutHandle = $rootScope.$on('unauthenticated', function (event, doLogout) { if (doLogout) { reset(true); } }); $rootScope.telemetryWsLoginHandle = $rootScope.$on('authenticated', function () { reset(true); }); return service; function publishCommands () { while(isOpened && hasCommands()) { dataStream.send(preparePublishCommands()).then(function () { checkToClose(); }); } tryOpenSocket(); } function hasCommands() { return cmdsWrapper.tsSubCmds.length > 0 || cmdsWrapper.historyCmds.length > 0 || cmdsWrapper.attrSubCmds.length > 0; } function preparePublishCommands() { var preparedWrapper = {}; var leftCount = MAX_PUBLISH_COMMANDS; preparedWrapper.tsSubCmds = popCmds(cmdsWrapper.tsSubCmds, leftCount); leftCount -= preparedWrapper.tsSubCmds.length; preparedWrapper.historyCmds = popCmds(cmdsWrapper.historyCmds, leftCount); leftCount -= preparedWrapper.historyCmds.length; preparedWrapper.attrSubCmds = popCmds(cmdsWrapper.attrSubCmds, leftCount); return preparedWrapper; } function popCmds(cmds, leftCount) { var toPublish = Math.min(cmds.length, leftCount); if (toPublish > 0) { return cmds.splice(0, toPublish); } else { return []; } } function onError (errorEvent) { if (errorEvent) { //showWsError(0, errorEvent); $log.warn('WebSocket error event', errorEvent); } isOpening = false; } function onOpen () { isOpening = false; isOpened = true; if (reconnectTimer) { $timeout.cancel(reconnectTimer); reconnectTimer = null; } if (isReconnect) { isReconnect = false; for (var r=0; r 1000 && closeEvent.code !== 1006) { showWsError(closeEvent.code, closeEvent.reason); } isOpening = false; isOpened = false; if (isActive) { if (!isReconnect) { reconnectSubscribers = []; for (var id in subscribers) { var subscriber = subscribers[id]; if (reconnectSubscribers.indexOf(subscriber) === -1) { reconnectSubscribers.push(subscriber); } } reset(false); isReconnect = true; } if (reconnectTimer) { $timeout.cancel(reconnectTimer); } reconnectTimer = $timeout(tryOpenSocket, RECONNECT_INTERVAL, false); } } function onMessage (message) { if (message.data) { var data = angular.fromJson(message.data); if (data.errorCode) { showWsError(data.errorCode, data.errorMsg); } else if (data.subscriptionId) { var subscriber = subscribers[data.subscriptionId]; if (subscriber && data) { var keys = fetchKeys(data.subscriptionId); if (!data.data) { data.data = {}; } for (var k = 0; k < keys.length; k++) { var key = keys[k]; if (!data.data[key]) { data.data[key] = []; } } subscriber.onData(data, data.subscriptionId); } } } checkToClose(); } function showWsError(errorCode, errorMsg) { var message = 'WebSocket Error: '; if (errorMsg) { message += errorMsg; } else { message += "error code - " + errorCode + "."; } $mdUtil.nextTick(function () { toast.showError(message); }); } function fetchKeys(subscriptionId) { var command = commands[subscriptionId]; if (command && command.keys && command.keys.length > 0) { return command.keys.split(","); } else { return []; } } function nextCmdId () { lastCmdId++; return lastCmdId; } function subscribe (subscriber) { isActive = true; var cmdId; if (angular.isDefined(subscriber.subscriptionCommands)) { for (var i=0;i -1) { reconnectSubscribers.splice(index, 1); } subscribersCount--; publishCommands(); } } function checkToClose () { if (subscribersCount === 0 && isOpened) { if (!socketCloseTimer) { socketCloseTimer = $timeout(closeSocket, WS_IDLE_TIMEOUT, false); } } } function tryOpenSocket () { if (isActive) { if (!isOpened && !isOpening) { isOpening = true; if (userService.isJwtTokenValid()) { openSocket(userService.getJwtToken()); } else { userService.refreshJwtToken().then(function success() { openSocket(userService.getJwtToken()); }, function fail() { isOpening = false; $rootScope.$broadcast('unauthenticated'); }); } } if (socketCloseTimer) { $timeout.cancel(socketCloseTimer); socketCloseTimer = null; } } } function openSocket(token) { dataStream = $websocket(telemetryUri + '?token=' + token); dataStream.onError(onError); dataStream.onOpen(onOpen); dataStream.onClose(onClose); dataStream.onMessage(onMessage, {autoApply: false}); } function closeSocket() { isActive = false; if (isOpened) { dataStream.close(); } } function reset(close) { if (socketCloseTimer) { $timeout.cancel(socketCloseTimer); socketCloseTimer = null; } lastCmdId = 0; subscribers = {}; subscribersCount = 0; commands = {}; cmdsWrapper.tsSubCmds = []; cmdsWrapper.historyCmds = []; cmdsWrapper.attrSubCmds = []; if (close) { closeSocket(); } } }