2022-07-07 12:43:22 +03:00
|
|
|
///
|
2024-01-09 10:46:16 +02:00
|
|
|
/// Copyright © 2016-2024 The Thingsboard Authors
|
2022-07-07 12:43:22 +03:00
|
|
|
///
|
|
|
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
/// you may not use this file except in compliance with the License.
|
|
|
|
|
/// You may obtain a copy of the License at
|
|
|
|
|
///
|
|
|
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
///
|
|
|
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
/// See the License for the specific language governing permissions and
|
|
|
|
|
/// limitations under the License.
|
|
|
|
|
///
|
|
|
|
|
|
|
|
|
|
import config from 'config';
|
|
|
|
|
import { _logger } from '../config/logger';
|
|
|
|
|
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
|
|
|
|
|
import { IQueue } from './queue.models';
|
|
|
|
|
import {
|
|
|
|
|
CreateQueueOptions,
|
|
|
|
|
ProcessErrorArgs,
|
|
|
|
|
ServiceBusAdministrationClient,
|
|
|
|
|
ServiceBusClient,
|
|
|
|
|
ServiceBusReceivedMessage,
|
|
|
|
|
ServiceBusReceiver,
|
|
|
|
|
ServiceBusSender
|
|
|
|
|
} from '@azure/service-bus';
|
|
|
|
|
|
|
|
|
|
export class ServiceBusTemplate implements IQueue {
|
|
|
|
|
|
|
|
|
|
private logger = _logger(`serviceBusTemplate`);
|
2023-10-30 16:17:12 +02:00
|
|
|
private queuePrefix: string = config.get('queue_prefix');
|
|
|
|
|
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
|
2022-07-07 12:43:22 +03:00
|
|
|
private namespaceName = config.get('service_bus.namespace_name');
|
|
|
|
|
private sasKeyName = config.get('service_bus.sas_key_name');
|
|
|
|
|
private sasKey = config.get('service_bus.sas_key');
|
|
|
|
|
private queueProperties: string = config.get('service_bus.queue_properties');
|
|
|
|
|
|
|
|
|
|
private sbClient: ServiceBusClient;
|
|
|
|
|
private serviceBusService: ServiceBusAdministrationClient;
|
|
|
|
|
private queueOptions: CreateQueueOptions = {};
|
|
|
|
|
private queues: string[] = [];
|
|
|
|
|
private receiver: ServiceBusReceiver;
|
|
|
|
|
private senderMap = new Map<string, ServiceBusSender>();
|
|
|
|
|
|
2022-07-07 19:37:36 +03:00
|
|
|
name = 'Azure Service Bus';
|
|
|
|
|
|
2022-07-07 12:43:22 +03:00
|
|
|
constructor() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async init() {
|
2022-07-07 19:37:36 +03:00
|
|
|
const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`;
|
|
|
|
|
this.sbClient = new ServiceBusClient(connectionString)
|
|
|
|
|
this.serviceBusService = new ServiceBusAdministrationClient(connectionString);
|
2022-07-07 12:43:22 +03:00
|
|
|
|
2022-07-07 19:37:36 +03:00
|
|
|
this.parseQueueProperties();
|
2022-07-07 12:43:22 +03:00
|
|
|
|
2024-11-04 13:03:25 +02:00
|
|
|
const listQueues = this.serviceBusService.listQueues();
|
2022-07-07 19:37:36 +03:00
|
|
|
for await (const queue of listQueues) {
|
|
|
|
|
this.queues.push(queue.name);
|
|
|
|
|
}
|
2022-07-07 12:43:22 +03:00
|
|
|
|
2022-07-07 19:37:36 +03:00
|
|
|
if (!this.queues.includes(this.requestTopic)) {
|
|
|
|
|
await this.createQueueIfNotExist(this.requestTopic);
|
|
|
|
|
this.queues.push(this.requestTopic);
|
2022-07-07 12:43:22 +03:00
|
|
|
}
|
2022-07-07 19:37:36 +03:00
|
|
|
|
|
|
|
|
this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'});
|
|
|
|
|
|
|
|
|
|
const messageProcessor = new JsInvokeMessageProcessor(this);
|
|
|
|
|
|
|
|
|
|
const messageHandler = async (message: ServiceBusReceivedMessage) => {
|
|
|
|
|
if (message) {
|
|
|
|
|
messageProcessor.onJsInvokeMessage(message.body);
|
|
|
|
|
await this.receiver.completeMessage(message);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
const errorHandler = async (error: ProcessErrorArgs) => {
|
|
|
|
|
this.logger.error('Failed to receive message from queue.', error);
|
|
|
|
|
};
|
|
|
|
|
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler})
|
2022-07-07 12:43:22 +03:00
|
|
|
}
|
|
|
|
|
|
2022-10-11 12:01:55 +03:00
|
|
|
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
|
2022-07-07 12:43:22 +03:00
|
|
|
if (!this.queues.includes(this.requestTopic)) {
|
|
|
|
|
await this.createQueueIfNotExist(this.requestTopic);
|
|
|
|
|
this.queues.push(this.requestTopic);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let customSender = this.senderMap.get(responseTopic);
|
|
|
|
|
|
|
|
|
|
if (!customSender) {
|
|
|
|
|
customSender = this.sbClient.createSender(responseTopic);
|
|
|
|
|
this.senderMap.set(responseTopic, customSender);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let data = {
|
2022-10-11 12:01:55 +03:00
|
|
|
key: msgKey,
|
2022-07-07 12:43:22 +03:00
|
|
|
data: [...rawResponse],
|
|
|
|
|
headers: headers
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return customSender.sendMessages({body: data});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private parseQueueProperties() {
|
|
|
|
|
let properties: { [n: string]: string } = {};
|
|
|
|
|
const props = this.queueProperties.split(';');
|
|
|
|
|
props.forEach(p => {
|
|
|
|
|
const delimiterPosition = p.indexOf(':');
|
|
|
|
|
properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
|
|
|
|
|
});
|
|
|
|
|
this.queueOptions = {
|
|
|
|
|
requiresDuplicateDetection: false,
|
|
|
|
|
maxSizeInMegabytes: Number(properties['maxSizeInMb']),
|
|
|
|
|
defaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`,
|
|
|
|
|
lockDuration: `PT${properties['lockDurationInSec']}S`
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async createQueueIfNotExist(topic: string) {
|
|
|
|
|
try {
|
|
|
|
|
await this.serviceBusService.createQueue(topic, this.queueOptions)
|
|
|
|
|
} catch (err: any) {
|
|
|
|
|
if (err && err.code !== "MessageEntityAlreadyExistsError") {
|
|
|
|
|
throw new Error(err);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-07 19:37:36 +03:00
|
|
|
async destroy() {
|
2022-07-07 12:43:22 +03:00
|
|
|
this.logger.info('Stopping Azure Service Bus resources...')
|
|
|
|
|
if (this.receiver) {
|
2022-07-07 17:15:59 +03:00
|
|
|
this.logger.info('Stopping Service Bus Receiver...');
|
2022-07-07 12:43:22 +03:00
|
|
|
try {
|
2022-07-07 17:15:59 +03:00
|
|
|
const _receiver = this.receiver;
|
2022-07-07 12:43:22 +03:00
|
|
|
// @ts-ignore
|
|
|
|
|
delete this.receiver;
|
2022-07-07 17:15:59 +03:00
|
|
|
await _receiver.close();
|
|
|
|
|
this.logger.info('Service Bus Receiver stopped.');
|
2022-07-07 12:43:22 +03:00
|
|
|
} catch (e) {
|
2022-07-07 17:15:59 +03:00
|
|
|
this.logger.info('Service Bus Receiver stop error.');
|
2022-07-07 12:43:22 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-07-07 17:15:59 +03:00
|
|
|
this.logger.info('Stopping Service Bus Senders...');
|
|
|
|
|
const senders: Promise<void>[] = [];
|
|
|
|
|
this.senderMap.forEach((sender) => {
|
|
|
|
|
senders.push(sender.close());
|
2022-07-07 12:43:22 +03:00
|
|
|
});
|
|
|
|
|
this.senderMap.clear();
|
2022-07-07 17:15:59 +03:00
|
|
|
try {
|
|
|
|
|
await Promise.all(senders);
|
|
|
|
|
this.logger.info('Service Bus Senders stopped.');
|
|
|
|
|
} catch (e) {
|
|
|
|
|
this.logger.info('Service Bus Senders stop error.');
|
|
|
|
|
}
|
2022-07-07 12:43:22 +03:00
|
|
|
|
|
|
|
|
if (this.sbClient) {
|
2022-07-07 17:15:59 +03:00
|
|
|
this.logger.info('Stopping Service Bus Client...');
|
2022-07-07 12:43:22 +03:00
|
|
|
try {
|
2022-07-07 17:15:59 +03:00
|
|
|
const _sbClient = this.sbClient;
|
2022-07-07 12:43:22 +03:00
|
|
|
// @ts-ignore
|
|
|
|
|
delete this.sbClient;
|
2022-07-07 17:15:59 +03:00
|
|
|
await _sbClient.close();
|
|
|
|
|
this.logger.info('Service Bus Client stopped.');
|
2022-07-07 12:43:22 +03:00
|
|
|
} catch (e) {
|
2022-07-07 17:15:59 +03:00
|
|
|
this.logger.info('Service Bus Client stop error.');
|
2022-07-07 12:43:22 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
this.logger.info('Azure Service Bus resources stopped.')
|
|
|
|
|
}
|
|
|
|
|
}
|