thingsboard/msa/js-executor/queue/pubSubTemplate.ts

163 lines
5.8 KiB
TypeScript
Raw Normal View History

///
2023-01-31 10:43:56 +02:00
/// Copyright © 2016-2023 The Thingsboard Authors
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import config from 'config';
import { _logger } from '../config/logger';
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
import { PubSub } from '@google-cloud/pubsub';
import { IQueue } from './queue.models';
import { Message } from '@google-cloud/pubsub/build/src/subscriber';
export class PubSubTemplate implements IQueue {
private logger = _logger(`pubSubTemplate`);
private projectId: string = config.get('pubsub.project_id');
private credentials = JSON.parse(config.get('pubsub.service_account'));
private queuePrefix: string = config.get('queue_prefix');
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
private queueProperties: string = config.get('pubsub.queue_properties');
private pubSubClient: PubSub;
private queueProps: { [n: string]: string } = {};
private topics: string[] = [];
private subscriptions: string[] = [];
2022-07-07 19:37:36 +03:00
name = 'Pub/Sub';
constructor() {
}
async init() {
2022-07-07 19:37:36 +03:00
this.pubSubClient = new PubSub({
projectId: this.projectId,
credentials: this.credentials
});
2022-07-07 19:37:36 +03:00
this.parseQueueProperties();
2022-07-07 19:37:36 +03:00
const topicList = await this.pubSubClient.getTopics();
2022-07-07 19:37:36 +03:00
if (topicList) {
topicList[0].forEach(topic => {
this.topics.push(PubSubTemplate.getName(topic.name));
});
}
2022-07-07 19:37:36 +03:00
const subscriptionList = await this.pubSubClient.getSubscriptions();
2022-07-07 19:37:36 +03:00
if (subscriptionList) {
topicList[0].forEach(sub => {
this.subscriptions.push(PubSubTemplate.getName(sub.name));
});
}
2022-07-07 19:37:36 +03:00
if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) {
await this.createTopic(this.requestTopic);
await this.createSubscription(this.requestTopic);
}
2022-07-07 19:37:36 +03:00
const subscription = this.pubSubClient.subscription(this.requestTopic);
2022-07-07 19:37:36 +03:00
const messageProcessor = new JsInvokeMessageProcessor(this);
2022-07-07 19:37:36 +03:00
const messageHandler = (message: Message) => {
messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8')));
message.ack();
};
2022-07-07 19:37:36 +03:00
subscription.on('message', messageHandler);
}
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) {
await this.createTopic(this.requestTopic);
await this.createSubscription(this.requestTopic);
}
let data = JSON.stringify(
{
key: msgKey,
data: [...rawResponse],
headers: headers
});
let dataBuffer = Buffer.from(data);
return this.pubSubClient.topic(responseTopic).publishMessage({data: dataBuffer});
}
private parseQueueProperties() {
const props = this.queueProperties.split(';');
props.forEach(p => {
const delimiterPosition = p.indexOf(':');
this.queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1);
});
}
private static getName(fullName: string): string {
const delimiterPosition = fullName.lastIndexOf('/');
return fullName.substring(delimiterPosition + 1);
}
private async createTopic(topic: string) {
if (!this.topics.includes(topic)) {
try {
await this.pubSubClient.createTopic(topic);
this.logger.info('Created new Pub/Sub topic: %s', topic);
} catch (e) {
this.logger.info('Pub/Sub topic already exists');
}
this.topics.push(topic);
}
}
private async createSubscription(topic: string) {
if (!this.subscriptions.includes(topic)) {
try {
await this.pubSubClient.createSubscription(topic, topic, {
topic: topic,
name: topic,
ackDeadlineSeconds: Number(this.queueProps['ackDeadlineInSec']),
messageRetentionDuration: {
seconds: this.queueProps['messageRetentionInSec']
}
});
this.logger.info('Created new Pub/Sub subscription: %s', topic);
} catch (e) {
this.logger.info('Pub/Sub subscription already exists.');
}
this.subscriptions.push(topic);
}
}
2022-07-07 19:37:36 +03:00
async destroy(): Promise<void> {
this.logger.info('Stopping Pub/Sub resources...');
if (this.pubSubClient) {
this.logger.info('Stopping Pub/Sub client...');
try {
const _pubSubClient = this.pubSubClient;
// @ts-ignore
delete this.pubSubClient;
await _pubSubClient.close();
this.logger.info('Pub/Sub client stopped.');
} catch (e) {
this.logger.info('Pub/Sub client stop error.');
}
}
this.logger.info('Pub/Sub resources stopped.');
}
}