diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 8d1f9d662a..baf54bfe2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -46,17 +46,17 @@ import java.util.concurrent.atomic.AtomicInteger; @Service public class RemoteJsInvokeService extends AbstractJsInvokeService { - @Value("${js.remote.max_requests_timeout}") + @Value("${queue.js.max_requests_timeout}") private long maxRequestsTimeout; @Getter - @Value("${js.remote.max_errors}") +// @Value("${queue.js.max_errors}") private int maxErrors; - @Value("${js.remote.max_black_list_duration_sec:60}") + @Value("${queue.js.max_black_list_duration_sec:60}") private int maxBlackListDurationSec; - @Value("${js.remote.stats.enabled:false}") + @Value("${queue.js.stats.enabled:false}") private boolean statsEnabled; private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0); @@ -65,7 +65,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0); - @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") +// @Scheduled(fixedDelayString = "${queue.js.stats.print_interval_ms}") public void printStats() { if (statsEnabled) { int pushedMsgs = kafkaPushedMsgs.getAndSet(0); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e1296275cd..89ac618d6d 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -415,7 +415,7 @@ state: persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" js: - evaluator: "${JS_EVALUATOR:local}" # local/remote + evaluator: "${JS_EVALUATOR:remote}" # local/remote # Built-in JVM JavaScript environment properties local: # Use Sandboxed (secured) JVM JavaScript environment @@ -582,9 +582,9 @@ queue: print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" js: # JS Eval request topic - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}" + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" # JS Eval responses topic prefix that is combined with node id - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js_eval.responses}" + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}" # JS Eval max pending requests max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" # JS Eval max request timeout diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js index f0facf8cc1..4cf20b4227 100644 --- a/msa/js-executor/api/jsInvokeMessageProcessor.js +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -31,6 +31,7 @@ const useSandbox = config.get('script.use_sandbox') === 'true'; const maxActiveScripts = Number(config.get('script.max_active_scripts')); function JsInvokeMessageProcessor(producer) { + console.log("Kafka Producer:", producer); this.producer = producer; this.executor = new JsExecutor(useSandbox); this.scriptMap = {}; @@ -144,17 +145,17 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, r JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) { var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); - this.producer.send( - { - topic: responseTopic, - messages: [ - { - key: scriptId, - value: rawResponse, - headers: headers - } - ] - } + this.producer.send(responseTopic, scriptId, rawResponse, headers + // { + // topic: responseTopic, + // messages: [ + // { + // key: scriptId, + // value: rawResponse, + // headers: headers + // } + // ] + // } ).then( () => {}, (err) => { diff --git a/msa/js-executor/config/custom-environment-variables.yml b/msa/js-executor/config/custom-environment-variables.yml index 585dfe8adb..77e68b0429 100644 --- a/msa/js-executor/config/custom-environment-variables.yml +++ b/msa/js-executor/config/custom-environment-variables.yml @@ -14,6 +14,8 @@ # limitations under the License. # +service-type: "TB_SERVICE_TYPE" + kafka: request_topic: "REMOTE_JS_EVAL_REQUEST_TOPIC" bootstrap: diff --git a/msa/js-executor/config/default.yml b/msa/js-executor/config/default.yml index 1290a8a429..724901d171 100644 --- a/msa/js-executor/config/default.yml +++ b/msa/js-executor/config/default.yml @@ -14,6 +14,8 @@ # limitations under the License. # +service-type: "kafka" + kafka: request_topic: "js.eval.requests" bootstrap: diff --git a/msa/js-executor/queue/kafka/kafkaTemplate.js b/msa/js-executor/queue/kafka/kafkaTemplate.js new file mode 100644 index 0000000000..bb3f81c48d --- /dev/null +++ b/msa/js-executor/queue/kafka/kafkaTemplate.js @@ -0,0 +1,117 @@ +/* + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +const { logLevel, Kafka } = require('kafkajs'); + +const config = require('config'), + JsInvokeMessageProcessor = require('../../api/jsInvokeMessageProcessor'), + logger = require('../../config/logger')._logger('main'), + KafkaJsWinstonLogCreator = require('../../config/logger').KafkaJsWinstonLogCreator; + +var kafkaClient; +var consumer; +var producer; + +function KafkaProducer() { + this.send = async (responseTopic, scriptId, rawResponse, headers) => { + return producer.send( + { + topic: responseTopic, + messages: [ + { + key: scriptId, + value: rawResponse, + headers: headers + } + ] + }); + } +} + +(async() => { + try { + logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + + const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); + const kafkaRequestTopic = config.get('kafka.request_topic'); + + logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); + + kafkaClient = new Kafka({ + brokers: kafkaBootstrapServers.split(','), + logLevel: logLevel.INFO, + logCreator: KafkaJsWinstonLogCreator + }); + + consumer = kafkaClient.consumer({ groupId: 'js-executor-group' }); + producer = kafkaClient.producer(); + const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); + await consumer.connect(); + await producer.connect(); + await consumer.subscribe({ topic: kafkaRequestTopic}); + + logger.info('Started ThingsBoard JavaScript Executor Microservice.'); + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + messageProcessor.onJsInvokeMessage(message); + }, + }); + + } catch (e) { + logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); + logger.error(e.stack); + exit(-1); + } +})(); + +process.on('exit', () => { + exit(0); +}); + +async function exit(status) { + logger.info('Exiting with status: %d ...', status); + if (consumer) { + logger.info('Stopping Kafka Consumer...'); + var _consumer = consumer; + consumer = null; + try { + await _consumer.disconnect(); + logger.info('Kafka Consumer stopped.'); + await disconnectProducer(); + process.exit(status); + } catch (e) { + logger.info('Kafka Consumer stop error.'); + await disconnectProducer(); + process.exit(status); + } + } else { + process.exit(status); + } +} + +async function disconnectProducer() { + if (producer) { + logger.info('Stopping Kafka Producer...'); + var _producer = producer; + producer = null; + try { + await _producer.disconnect(); + logger.info('Kafka Producer stopped.'); + } catch (e) { + logger.info('Kafka Producer stop error.'); + } + } +} diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index f56e5bb766..486f9dc187 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -13,89 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -const { logLevel, Kafka } = require('kafkajs'); -const config = require('config'), - JsInvokeMessageProcessor = require('./api/jsInvokeMessageProcessor'), - logger = require('./config/logger')._logger('main'), - KafkaJsWinstonLogCreator = require('./config/logger').KafkaJsWinstonLogCreator; +const config = require('config'); -var kafkaClient; -var consumer; -var producer; - -(async() => { - try { - logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); - - const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); - const kafkaRequestTopic = config.get('kafka.request_topic'); - - logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); - logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); - - kafkaClient = new Kafka({ - brokers: kafkaBootstrapServers.split(','), - logLevel: logLevel.INFO, - logCreator: KafkaJsWinstonLogCreator - }); - - consumer = kafkaClient.consumer({ groupId: 'js-executor-group' }); - producer = kafkaClient.producer(); - const messageProcessor = new JsInvokeMessageProcessor(producer); - await consumer.connect(); - await producer.connect(); - await consumer.subscribe({ topic: kafkaRequestTopic}); - - logger.info('Started ThingsBoard JavaScript Executor Microservice.'); - await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - messageProcessor.onJsInvokeMessage(message); - }, - }); - - } catch (e) { - logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - logger.error(e.stack); - exit(-1); - } -})(); - -process.on('exit', () => { - exit(0); -}); - -async function exit(status) { - logger.info('Exiting with status: %d ...', status); - if (consumer) { - logger.info('Stopping Kafka Consumer...'); - var _consumer = consumer; - consumer = null; - try { - await _consumer.disconnect(); - logger.info('Kafka Consumer stopped.'); - await disconnectProducer(); - process.exit(status); - } catch (e) { - logger.info('Kafka Consumer stop error.'); - await disconnectProducer(); - process.exit(status); - } - } else { - process.exit(status); - } +const serviceType = config.get('service-type'); +switch (serviceType) { + case 'kafka': + require('./queue/kafka/kafkaTemplate'); + console.log('Used kafka template.'); + break; + default: + console.error('Unknown service type: ', serviceType); + process.exit(-1); } -async function disconnectProducer() { - if (producer) { - logger.info('Stopping Kafka Producer...'); - var _producer = producer; - producer = null; - try { - await _producer.disconnect(); - logger.info('Kafka Producer stopped.'); - } catch (e) { - logger.info('Kafka Producer stop error.'); - } - } -}