diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index a080c1c0bd..37a62117be 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -166,7 +166,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setScriptBody(scriptIdToBodysMap.get(scriptId)); for (int i = 0; i < args.length; i++) { - jsRequestBuilder.setArgs(i, args[i].toString()); + jsRequestBuilder.addArgs(args[i].toString()); } JsInvokeProtos.RemoteJsRequest jsRequestWrapper = JsInvokeProtos.RemoteJsRequest.newBuilder() @@ -180,7 +180,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { return invokeResult.getResult(); } else { log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); - throw new RuntimeException(invokeResult.getErrorCode().name()); + throw new RuntimeException(invokeResult.getErrorDetails()); } }); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 1cfb633ddf..2a9805156f 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -174,6 +174,8 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } catch (ExecutionException e) { if (e.getCause() instanceof ScriptException) { throw (ScriptException)e.getCause(); + } else if (e.getCause() instanceof RuntimeException) { + throw new ScriptException(e.getCause().getMessage()); } else { throw new ScriptException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeScriptFactory.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeScriptFactory.java index 5cc9c55b6f..ef2b5aae15 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeScriptFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeScriptFactory.java @@ -28,7 +28,7 @@ public class RuleNodeScriptFactory { " var metadata = JSON.parse(metadataStr); " + " return JSON.stringify(%s(msg, metadata, msgType));" + " function %s(%s, %s, %s) {"; - private static final String JS_WRAPPER_SUFFIX = "}" + + private static final String JS_WRAPPER_SUFFIX = "\n}" + "\n}"; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index b7d0cc15a2..a6d42eb71c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -407,7 +407,7 @@ state: kafka: enabled: true - bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" + bootstrap.servers: "${TB_KAFKA_SERVERS:192.168.2.157:9092}" acks: "${TB_KAFKA_ACKS:all}" retries: "${TB_KAFKA_RETRIES:1}" batch.size: "${TB_KAFKA_BATCH_SIZE:16384}" diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 097dbb8e0c..1f0d3ad20e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -17,6 +17,9 @@ package org.thingsboard.server.kafka; import lombok.Builder; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -35,6 +38,7 @@ import java.util.function.BiConsumer; /** * Created by ashvayka on 24.09.18. */ +@Slf4j public class TBKafkaProducerTemplate { private final KafkaProducer producer; @@ -44,7 +48,7 @@ public class TBKafkaProducerTemplate { private TbKafkaEnricher enricher = ((value, responseTopic, requestId) -> value); private final TbKafkaPartitioner partitioner; - private final List partitionInfoList; + private List partitionInfoList; @Getter private final String defaultTopic; @@ -55,14 +59,24 @@ public class TBKafkaProducerTemplate { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); this.producer = new KafkaProducer<>(props); - //Maybe this should not be cached, but we don't plan to change size of partitions - this.partitionInfoList = producer.partitionsFor(defaultTopic); this.encoder = encoder; this.enricher = enricher; this.partitioner = partitioner; this.defaultTopic = defaultTopic; } + public void init() { + try { + TBKafkaAdmin admin = new TBKafkaAdmin(); + CreateTopicsResult result = admin.createTopic(new NewTopic(defaultTopic, 100, (short) 1)); + result.all().get(); + } catch (Exception e) { + log.trace("Failed to create topic: {}", e.getMessage(), e); + } + //Maybe this should not be cached, but we don't plan to change size of partitions + this.partitionInfoList = producer.partitionsFor(defaultTopic); + } + public T enrich(T value, String responseTopic, UUID requestId) { return enricher.enrich(value, responseTopic, requestId); } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java index 38e4436a94..17b44597c7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java @@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; @@ -33,11 +34,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; /** * Created by ashvayka on 25.09.18. @@ -86,6 +83,7 @@ public class TbKafkaRequestTemplate { } catch (Exception e) { log.trace("Failed to create topic: {}", e.getMessage(), e); } + this.requestTemplate.init(); tickTs = System.currentTimeMillis(); responseTemplate.subscribe(); executor.submit(() -> { diff --git a/msa/docker/docker-compose.yml b/msa/docker/docker-compose.yml new file mode 100644 index 0000000000..a077c0ee1a --- /dev/null +++ b/msa/docker/docker-compose.yml @@ -0,0 +1,46 @@ +# +# Copyright © 2016-2018 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +version: '2' + +services: + zookeeper: + image: "wurstmeister/zookeeper" + ports: + - "2181" + kafka: + image: "wurstmeister/kafka" + ports: + - "9092:9092" + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092 + KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://${KAFKA_HOSTNAME}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE + KAFKA_CREATE_TOPICS: "${KAFKA_TOPICS}" + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + depends_on: + - zookeeper + tb-js-executor: + image: "local-maven-build/tb-js-executor:latest" + environment: + TB_KAFKA_SERVERS: kafka:9092 + env_file: + - tb-js-executor.env + depends_on: + - kafka diff --git a/msa/docker/tb-js-executor.env b/msa/docker/tb-js-executor.env new file mode 100644 index 0000000000..f562c3aa28 --- /dev/null +++ b/msa/docker/tb-js-executor.env @@ -0,0 +1,7 @@ + +REMOTE_JS_EVAL_REQUEST_TOPIC=js.eval.requests +TB_KAFKA_SERVERS=localhost:9092 +LOGGER_LEVEL=debug +LOG_FOLDER=logs +LOGGER_FILENAME=tb-js-executor-%DATE%.log +DOCKER_MODE=true \ No newline at end of file diff --git a/msa/js-executor/api/jsExecutor.js b/msa/js-executor/api/jsExecutor.js new file mode 100644 index 0000000000..18d3f6cf13 --- /dev/null +++ b/msa/js-executor/api/jsExecutor.js @@ -0,0 +1,48 @@ +/* + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'use strict'; + +const vm = require('vm'); + +function JsExecutor() { +} + +JsExecutor.prototype.compileScript = function(code) { + return new Promise(function(resolve, reject) { + try { + code = "("+code+")(...args)"; + var script = new vm.Script(code); + resolve(script); + } catch (err) { + reject(err); + } + }); +} + +JsExecutor.prototype.executeScript = function(script, args, timeout) { + return new Promise(function(resolve, reject) { + try { + var sandbox = Object.create(null); + sandbox.args = args; + var result = script.runInNewContext(sandbox, {timeout: timeout}); + resolve(result); + } catch (err) { + reject(err); + } + }); +} + +module.exports = JsExecutor; diff --git a/msa/js-executor/api/jsInvokeMessageProcessor.js b/msa/js-executor/api/jsInvokeMessageProcessor.js new file mode 100644 index 0000000000..ef81adfc97 --- /dev/null +++ b/msa/js-executor/api/jsInvokeMessageProcessor.js @@ -0,0 +1,226 @@ +/* + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const logger = require('../config/logger')('JsInvokeMessageProcessor'), + Utils = require('./utils'), + js = require('./jsinvoke.proto').js, + KeyedMessage = require('kafka-node').KeyedMessage, + JsExecutor = require('./jsExecutor'); + +function JsInvokeMessageProcessor(producer) { + this.producer = producer; + this.executor = new JsExecutor(); + this.scriptMap = {}; +} + +JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) { + + var requestId; + try { + var request = js.RemoteJsRequest.decode(message.value); + requestId = getRequestId(request); + + logger.debug('[%s] Received request, responseTopic: [%s]', requestId, request.responseTopic); + + if (request.compileRequest) { + this.processCompileRequest(requestId, request.responseTopic, request.compileRequest); + } else if (request.invokeRequest) { + this.processInvokeRequest(requestId, request.responseTopic, request.invokeRequest); + } else if (request.releaseRequest) { + this.processReleaseRequest(requestId, request.responseTopic, request.releaseRequest); + } else { + logger.error('[%s] Unknown request recevied!', requestId); + } + + } catch (err) { + logger.error('[%s] Failed to process request: %s', requestId, err.message); + logger.error(err.stack); + } +} + +JsInvokeMessageProcessor.prototype.processCompileRequest = function(requestId, responseTopic, compileRequest) { + var scriptId = getScriptId(compileRequest); + logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); + + this.executor.compileScript(compileRequest.scriptBody).then( + (script) => { + this.scriptMap[scriptId] = script; + var compileResponse = createCompileResponse(scriptId, true); + logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, scriptId, compileResponse); + }, + (err) => { + var compileResponse = createCompileResponse(scriptId, false, js.JsInvokeErrorCode.COMPILATION_ERROR, err); + logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, scriptId, compileResponse); + } + ); +} + +JsInvokeMessageProcessor.prototype.processInvokeRequest = function(requestId, responseTopic, invokeRequest) { + var scriptId = getScriptId(invokeRequest); + logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId); + this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then( + (script) => { + this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then( + (result) => { + var invokeResponse = createInvokeResponse(result, true); + logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + }, + (err) => { + var errorCode; + if (err.message.includes('Script execution timed out')) { + errorCode = js.JsInvokeErrorCode.TIMEOUT_ERROR; + } else { + errorCode = js.JsInvokeErrorCode.RUNTIME_ERROR; + } + var invokeResponse = createInvokeResponse("", false, errorCode, err); + logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); + this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + } + ) + }, + (err) => { + var invokeResponse = createInvokeResponse("", false, js.JsInvokeErrorCode.COMPILATION_ERROR, err); + logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, js.JsInvokeErrorCode.COMPILATION_ERROR); + this.sendResponse(requestId, responseTopic, scriptId, null, invokeResponse); + } + ); +} + +JsInvokeMessageProcessor.prototype.processReleaseRequest = function(requestId, responseTopic, releaseRequest) { + var scriptId = getScriptId(releaseRequest); + logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId); + if (this.scriptMap[scriptId]) { + delete this.scriptMap[scriptId]; + } + var releaseResponse = createReleaseResponse(scriptId, true); + logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); + this.sendResponse(requestId, responseTopic, scriptId, null, null, releaseResponse); +} + +JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, scriptId, compileResponse, invokeResponse, releaseResponse) { + var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); + var rawResponse = js.RemoteJsResponse.encode(remoteResponse).finish(); + const message = new KeyedMessage(scriptId, rawResponse); + const payloads = [ { topic: responseTopic, messages: message, key: scriptId } ]; + this.producer.send(payloads, function (err, data) { + if (err) { + logger.error('[%s] Failed to send response to kafka: %s', requestId, err.message); + logger.error(err.stack); + } + }); +} + +JsInvokeMessageProcessor.prototype.getOrCompileScript = function(scriptId, scriptBody) { + var self = this; + return new Promise(function(resolve, reject) { + if (self.scriptMap[scriptId]) { + resolve(self.scriptMap[scriptId]); + } else { + self.executor.compileScript(scriptBody).then( + (script) => { + self.scriptMap[scriptId] = script; + resolve(script); + }, + (err) => { + reject(err); + } + ); + } + }); +} + +function createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse) { + const requestIdBits = Utils.UUIDToBits(requestId); + return js.RemoteJsResponse.create( + { + requestIdMSB: requestIdBits[0], + requestIdLSB: requestIdBits[1], + compileResponse: compileResponse, + invokeResponse: invokeResponse, + releaseResponse: releaseResponse + } + ); +} + +function createCompileResponse(scriptId, success, errorCode, err) { + const scriptIdBits = Utils.UUIDToBits(scriptId); + return js.JsCompileResponse.create( + { + errorCode: errorCode, + success: success, + errorDetails: parseJsErrorDetails(err), + scriptIdMSB: scriptIdBits[0], + scriptIdLSB: scriptIdBits[1] + } + ); +} + +function createInvokeResponse(result, success, errorCode, err) { + return js.JsInvokeResponse.create( + { + errorCode: errorCode, + success: success, + errorDetails: parseJsErrorDetails(err), + result: result + } + ); +} + +function createReleaseResponse(scriptId, success) { + const scriptIdBits = Utils.UUIDToBits(scriptId); + return js.JsReleaseResponse.create( + { + success: success, + scriptIdMSB: scriptIdBits[0], + scriptIdLSB: scriptIdBits[1] + } + ); +} + +function parseJsErrorDetails(err) { + if (!err) { + return ''; + } + var details = err.name + ': ' + err.message; + if (err.stack) { + var lines = err.stack.split('\n'); + if (lines && lines.length) { + var line = lines[0]; + var splitted = line.split(':'); + if (splitted && splitted.length === 2) { + if (!isNaN(splitted[1])) { + details += ' in at line number ' + splitted[1]; + } + } + } + } + return details; +} + +function getScriptId(request) { + return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB); +} + +function getRequestId(request) { + return Utils.toUUIDString(request.requestIdMSB, request.requestIdLSB); +} + +module.exports = JsInvokeMessageProcessor; \ No newline at end of file diff --git a/msa/js-executor/api/jsMessageConsumer.js b/msa/js-executor/api/jsMessageConsumer.js deleted file mode 100644 index 39dd43aa33..0000000000 --- a/msa/js-executor/api/jsMessageConsumer.js +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -'use strict'; - -const logger = require('../config/logger')('JsMessageConsumer'); -const Utils = require('./Utils'); -const js = require('./jsinvoke.proto').js; -const KeyedMessage = require('kafka-node').KeyedMessage; - - -exports.onJsInvokeMessage = function(message, producer) { - - logger.info('Received message: %s', JSON.stringify(message)); - - var request = js.RemoteJsRequest.decode(message.value); - - logger.info('Received request: %s', JSON.stringify(request)); - - var requestId = getRequestId(request); - - logger.info('Received request, responseTopic: [%s]; requestId: [%s]', request.responseTopic, requestId); - - if (request.compileRequest) { - var scriptId = getScriptId(request.compileRequest); - - logger.info('Received compile request, scriptId: [%s]', scriptId); - - var compileResponse = js.JsCompileResponse.create( - { - errorCode: js.JsInvokeErrorCode.COMPILATION_ERROR, - success: false, - errorDetails: 'Not Implemented!', - scriptIdLSB: request.compileRequest.scriptIdLSB, - scriptIdMSB: request.compileRequest.scriptIdMSB - } - ); - const requestIdBits = Utils.UUIDToBits(requestId); - var response = js.RemoteJsResponse.create( - { - requestIdMSB: requestIdBits[0], - requestIdLSB: requestIdBits[1], - compileResponse: compileResponse - } - ); - var rawResponse = js.RemoteJsResponse.encode(response).finish(); - sendMessage(producer, rawResponse, request.responseTopic, scriptId); - } -} - -function sendMessage(producer, rawMessage, responseTopic, scriptId) { - const message = new KeyedMessage(scriptId, rawMessage); - const payloads = [ { topic: responseTopic, messages: rawMessage, key: scriptId } ]; - producer.send(payloads, function (err, data) { - console.log(data); - }); -} - -function getScriptId(request) { - return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB); -} - -function getRequestId(request) { - return Utils.toUUIDString(request.requestIdMSB, request.requestIdLSB); -} diff --git a/msa/js-executor/api/Utils.js b/msa/js-executor/api/utils.js similarity index 94% rename from msa/js-executor/api/Utils.js rename to msa/js-executor/api/utils.js index 16d98a596b..2bb31267f8 100644 --- a/msa/js-executor/api/Utils.js +++ b/msa/js-executor/api/utils.js @@ -16,8 +16,8 @@ 'use strict'; -const Long = require('long'); -const uuidParse = require('uuid-parse'); +const Long = require('long'), + uuidParse = require('uuid-parse'); var logger = require('../config/logger')('Utils'); diff --git a/msa/js-executor/config/logger.js b/msa/js-executor/config/logger.js index 86df82777d..695b453763 100644 --- a/msa/js-executor/config/logger.js +++ b/msa/js-executor/config/logger.js @@ -22,7 +22,7 @@ const { combine, timestamp, label, printf, splat } = format; var loggerTransports = []; -if (process.env.NODE_ENV !== 'production') { +if (process.env.NODE_ENV !== 'production' || process.env.DOCKER_MODE === 'true') { loggerTransports.push(new transports.Console({ handleExceptions: true })); diff --git a/msa/js-executor/docker/Dockerfile b/msa/js-executor/docker/Dockerfile new file mode 100644 index 0000000000..2b62eb33ae --- /dev/null +++ b/msa/js-executor/docker/Dockerfile @@ -0,0 +1,26 @@ +# +# Copyright © 2016-2018 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM debian:stretch + +COPY start-js-executor.sh ${pkg.name}.deb /tmp/ + +RUN chmod a+x /tmp/*.sh \ + && mv /tmp/start-js-executor.sh /usr/bin + +RUN dpkg -i /tmp/${pkg.name}.deb + +CMD ["start-js-executor.sh"] diff --git a/msa/js-executor/docker/start-js-executor.sh b/msa/js-executor/docker/start-js-executor.sh new file mode 100755 index 0000000000..af7c686a11 --- /dev/null +++ b/msa/js-executor/docker/start-js-executor.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# +# Copyright © 2016-2018 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +echo "Starting '${project.name}' ..." + +CONF_FOLDER="${pkg.installFolder}/conf" + +mainfile=${pkg.installFolder}/bin/${pkg.name} +configfile=${pkg.name}.conf +identity=${pkg.name} + +source "${CONF_FOLDER}/${configfile}" + +su -s /bin/sh -c "$mainfile" diff --git a/msa/js-executor/pom.xml b/msa/js-executor/pom.xml index 2dd03a4a8b..51416f5aa3 100644 --- a/msa/js-executor/pom.xml +++ b/msa/js-executor/pom.xml @@ -40,6 +40,7 @@ /usr/share/${pkg.name} ${project.build.directory}/package/linux ${project.build.directory}/package/windows + true @@ -211,6 +212,22 @@ + + copy-docker-config + process-resources + + copy-resources + + + ${project.build.directory} + + + docker + true + + + + @@ -260,6 +277,27 @@ + + com.spotify + dockerfile-maven-plugin + 1.4.4 + + + build-docker-image + pre-integration-test + + build + + + + + ${dockerfile.skip} + local-maven-build/${pkg.name} + true + false + ${project.build.directory} + + diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index 58d721b4f9..03fac2ebe0 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -16,17 +16,10 @@ const config = require('config'), kafka = require('kafka-node'), - Consumer = kafka.Consumer, + ConsumerGroup = kafka.ConsumerGroup, Producer = kafka.Producer, - JsMessageConsumer = require('./api/jsMessageConsumer'); - -const logger = require('./config/logger')('main'); - -const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); -const kafkaRequestTopic = config.get('kafka.request_topic'); - -logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); -logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); + JsInvokeMessageProcessor = require('./api/jsInvokeMessageProcessor'), + logger = require('./config/logger')('main'); var kafkaClient; @@ -34,35 +27,42 @@ var kafkaClient; try { logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); + const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); + const kafkaRequestTopic = config.get('kafka.request_topic'); + + logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); + logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); + kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaBootstrapServers}); - var consumer = new Consumer( - kafkaClient, - [ - { topic: kafkaRequestTopic, partition: 0 } - ], + var consumer = new ConsumerGroup( { + kafkaHost: kafkaBootstrapServers, + groupId: 'js-executor-group', autoCommit: true, encoding: 'buffer' - } + }, + kafkaRequestTopic ); var producer = new Producer(kafkaClient); producer.on('error', (err) => { - logger.error('Unexpected kafka producer error'); - logger.error(err); + logger.error('Unexpected kafka producer error: %s', err.message); + logger.error(err.stack); }); + var messageProcessor = new JsInvokeMessageProcessor(producer); + producer.on('ready', () => { consumer.on('message', (message) => { - JsMessageConsumer.onJsInvokeMessage(message, producer); + messageProcessor.onJsInvokeMessage(message); }); + logger.info('Started ThingsBoard JavaScript Executor Microservice.'); }); - logger.info('Started ThingsBoard JavaScript Executor Microservice.'); } catch (e) { logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); - logger.error(e); + logger.error(e.stack); exit(-1); } })(); @@ -75,11 +75,13 @@ function exit(status) { logger.info('Exiting with status: %d ...', status); if (kafkaClient) { logger.info('Stopping Kafka Client...'); - kafkaClient.close(() => { + var _kafkaClient = kafkaClient; + kafkaClient = null; + _kafkaClient.close(() => { logger.info('Kafka Client stopped.'); process.exit(status); }); } else { process.exit(status); } -} \ No newline at end of file +} diff --git a/msa/js-executor/src/main/scripts/control/deb/postinst b/msa/js-executor/src/main/scripts/control/deb/postinst index 0767d3f2c7..4c48c5508a 100644 --- a/msa/js-executor/src/main/scripts/control/deb/postinst +++ b/msa/js-executor/src/main/scripts/control/deb/postinst @@ -2,5 +2,5 @@ chown -R ${pkg.user}: ${pkg.logFolder} chown -R ${pkg.user}: ${pkg.installFolder} -update-rc.d ${pkg.name} defaults +# update-rc.d ${pkg.name} defaults