178 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			178 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
/*
 | 
						|
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
'use strict';
 | 
						|
 | 
						|
const config = require('config'),
 | 
						|
    JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'),
 | 
						|
    logger = require('../config/logger')._logger('serviceBusTemplate');
 | 
						|
const {ServiceBusClient, ServiceBusAdministrationClient} = require("@azure/service-bus");
 | 
						|
 | 
						|
const requestTopic = config.get('request_topic');
 | 
						|
const namespaceName = config.get('service_bus.namespace_name');
 | 
						|
const sasKeyName = config.get('service_bus.sas_key_name');
 | 
						|
const sasKey = config.get('service_bus.sas_key');
 | 
						|
const queueProperties = config.get('service_bus.queue_properties');
 | 
						|
 | 
						|
let sbClient;
 | 
						|
let receiver;
 | 
						|
let serviceBusService;
 | 
						|
 | 
						|
let queueOptions = {};
 | 
						|
const queues = [];
 | 
						|
const senderMap = new Map();
 | 
						|
 | 
						|
function ServiceBusProducer() {
 | 
						|
    this.send = async (responseTopic, scriptId, rawResponse, headers) => {
 | 
						|
        if (!queues.includes(requestTopic)) {
 | 
						|
            await createQueueIfNotExist(requestTopic);
 | 
						|
            queues.push(requestTopic);
 | 
						|
        }
 | 
						|
 | 
						|
        let customSender = senderMap.get(responseTopic);
 | 
						|
 | 
						|
        if (!customSender) {
 | 
						|
            customSender = new CustomSender(responseTopic);
 | 
						|
            senderMap.set(responseTopic, customSender);
 | 
						|
        }
 | 
						|
 | 
						|
        let data = {
 | 
						|
            key: scriptId,
 | 
						|
            data: [...rawResponse],
 | 
						|
            headers: headers
 | 
						|
        };
 | 
						|
 | 
						|
        return customSender.send({body: data});
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
function CustomSender(topic) {
 | 
						|
    this.sender = sbClient.createSender(topic);
 | 
						|
 | 
						|
    this.send = async (message) => {
 | 
						|
        return this.sender.sendMessages(message);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
(async () => {
 | 
						|
    try {
 | 
						|
        logger.info('Starting ThingsBoard JavaScript Executor Microservice...');
 | 
						|
 | 
						|
        const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`;
 | 
						|
        sbClient = new ServiceBusClient(connectionString)
 | 
						|
        serviceBusService = new ServiceBusAdministrationClient(connectionString);
 | 
						|
 | 
						|
        parseQueueProperties();
 | 
						|
 | 
						|
        await new Promise((resolve, reject) => {
 | 
						|
            serviceBusService.listQueues((err, data) => {
 | 
						|
                if (err) {
 | 
						|
                    reject(err);
 | 
						|
                } else {
 | 
						|
                    for (const queue of data) {
 | 
						|
                        queues.push(queue.name);
 | 
						|
                    }
 | 
						|
                    resolve();
 | 
						|
                }
 | 
						|
            });
 | 
						|
        });
 | 
						|
 | 
						|
        if (!queues.includes(requestTopic)) {
 | 
						|
            await createQueueIfNotExist(requestTopic);
 | 
						|
            queues.push(requestTopic);
 | 
						|
        }
 | 
						|
 | 
						|
        receiver = sbClient.createReceiver(requestTopic, {receiveMode: 'peekLock'});
 | 
						|
 | 
						|
        const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer());
 | 
						|
 | 
						|
        const messageHandler = async (message) => {
 | 
						|
            if (message) {
 | 
						|
                messageProcessor.onJsInvokeMessage(message.body);
 | 
						|
                await message.complete();
 | 
						|
            }
 | 
						|
        };
 | 
						|
        const errorHandler = (error) => {
 | 
						|
            logger.error('Failed to receive message from queue.', error);
 | 
						|
        };
 | 
						|
        receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
 | 
						|
    } catch (e) {
 | 
						|
        logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
 | 
						|
        logger.error(e.stack);
 | 
						|
        await exit(-1);
 | 
						|
    }
 | 
						|
})();
 | 
						|
 | 
						|
async function createQueueIfNotExist(topic) {
 | 
						|
    return new Promise((resolve, reject) => {
 | 
						|
        serviceBusService.createQueue(topic, queueOptions, (err) => {
 | 
						|
            if (err && err.code !== "MessageEntityAlreadyExistsError") {
 | 
						|
                reject(err);
 | 
						|
            } else {
 | 
						|
                resolve();
 | 
						|
            }
 | 
						|
        });
 | 
						|
    });
 | 
						|
}
 | 
						|
 | 
						|
function parseQueueProperties() {
 | 
						|
    let properties = {};
 | 
						|
    const props = queueProperties.split(';');
 | 
						|
    props.forEach(p => {
 | 
						|
        const delimiterPosition = p.indexOf(':');
 | 
						|
        properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
 | 
						|
    });
 | 
						|
    queueOptions = {
 | 
						|
        requiresDuplicateDetection: false,
 | 
						|
        maxSizeInMegabytes: properties['maxSizeInMb'],
 | 
						|
        defaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
 | 
						|
        lockDuration: `PT${properties['lockDurationInSec']}S`
 | 
						|
    };
 | 
						|
}
 | 
						|
 | 
						|
process.on('exit', () => {
 | 
						|
    exit(0);
 | 
						|
});
 | 
						|
 | 
						|
async function exit(status) {
 | 
						|
    logger.info('Exiting with status: %d ...', status);
 | 
						|
    logger.info('Stopping Azure Service Bus resources...')
 | 
						|
    if (receiver) {
 | 
						|
        try {
 | 
						|
            await receiver.close();
 | 
						|
        } catch (e) {
 | 
						|
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    senderMap.forEach((k, v) => {
 | 
						|
        try {
 | 
						|
            v.sender.close();
 | 
						|
        } catch (e) {
 | 
						|
 | 
						|
        }
 | 
						|
    });
 | 
						|
 | 
						|
    if (sbClient) {
 | 
						|
        try {
 | 
						|
            sbClient.close();
 | 
						|
        } catch (e) {
 | 
						|
 | 
						|
        }
 | 
						|
    }
 | 
						|
    logger.info('Azure Service Bus resources stopped.')
 | 
						|
    process.exit(status);
 | 
						|
}
 |