From 64b50fa8bc859cc155d0f2465e4bac4fd5e71f64 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 27 Sep 2018 21:05:14 +0300 Subject: [PATCH] Remote Js Executor basic message processing. --- .../service/script/RemoteJsInvokeService.java | 21 ++++++++++- .../script/RuleNodeJsScriptEngine.java | 6 ++- application/src/main/proto/jsinvoke.proto | 17 ++++++--- .../server/kafka/TBKafkaConsumerTemplate.java | 12 +++++- .../server/kafka/TBKafkaProducerTemplate.java | 16 +++++++- .../server/kafka/TbKafkaEnricher.java | 24 ++++++++++++ .../kafka/TbKafkaRequestIdExtractor.java | 24 ++++++++++++ .../server/kafka/TbKafkaRequestTemplate.java | 35 +++++++++++++----- msa/js-executor/.gitignore | 2 +- msa/js-executor/api/Utils.js | 37 +++++++++++++++++++ msa/js-executor/api/jsMessageConsumer.js | 36 +++++++++++++++--- msa/js-executor/package.json | 4 +- msa/js-executor/server.js | 17 +++++---- pom.xml | 1 + 14 files changed, 217 insertions(+), 35 deletions(-) create mode 100644 common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestIdExtractor.java create mode 100644 msa/js-executor/api/Utils.js diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 2952cfa61b..a080c1c0bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -78,6 +78,22 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { requestBuilder.settings(kafkaSettings); requestBuilder.defaultTopic(requestTopic); requestBuilder.encoder(new RemoteJsRequestEncoder()); + requestBuilder.enricher((request, responseTopic, requestId) -> { + JsInvokeProtos.RemoteJsRequest.Builder remoteRequest = JsInvokeProtos.RemoteJsRequest.newBuilder(); + if (request.hasCompileRequest()) { + remoteRequest.setCompileRequest(request.getCompileRequest()); + } + if (request.hasInvokeRequest()) { + remoteRequest.setInvokeRequest(request.getInvokeRequest()); + } + if (request.hasReleaseRequest()) { + remoteRequest.setReleaseRequest(request.getReleaseRequest()); + } + remoteRequest.setResponseTopic(responseTopic); + remoteRequest.setRequestIdMSB(requestId.getMostSignificantBits()); + remoteRequest.setRequestIdLSB(requestId.getLeastSignificantBits()); + return remoteRequest.build(); + }); TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); responseBuilder.settings(kafkaSettings); @@ -87,6 +103,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { responseBuilder.autoCommit(true); responseBuilder.autoCommitIntervalMs(autoCommitInterval); responseBuilder.decoder(new RemoteJsResponseDecoder()); + responseBuilder.requestIdExtractor((response) -> { + return new UUID(response.getRequestIdMSB(), response.getRequestIdLSB()); + }); TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder builder = TbKafkaRequestTemplate.builder(); @@ -128,7 +147,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { return compiledScriptId; } else { log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); - throw new RuntimeException(compilationResult.getErrorCode().name()); + throw new RuntimeException(compilationResult.getErrorDetails()); } }); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 021e6da119..1cfb633ddf 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -48,7 +48,11 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S try { this.scriptId = this.sandboxService.eval(JsScriptType.RULE_NODE_SCRIPT, script, argNames).get(); } catch (Exception e) { - throw new IllegalArgumentException("Can't compile script: " + e.getMessage(), e); + Throwable t = e; + if (e instanceof ExecutionException) { + t = e.getCause(); + } + throw new IllegalArgumentException("Can't compile script: " + t.getMessage(), t); } } diff --git a/application/src/main/proto/jsinvoke.proto b/application/src/main/proto/jsinvoke.proto index d85c3534bb..a2afca2ab6 100644 --- a/application/src/main/proto/jsinvoke.proto +++ b/application/src/main/proto/jsinvoke.proto @@ -26,15 +26,20 @@ enum JsInvokeErrorCode { } message RemoteJsRequest { - JsCompileRequest compileRequest = 1; - JsInvokeRequest invokeRequest = 2; - JsReleaseRequest releaseRequest = 3; + string responseTopic = 1; + int64 requestIdMSB = 2; + int64 requestIdLSB = 3; + JsCompileRequest compileRequest = 4; + JsInvokeRequest invokeRequest = 5; + JsReleaseRequest releaseRequest = 6; } message RemoteJsResponse { - JsCompileResponse compileResponse = 1; - JsInvokeResponse invokeResponse = 2; - JsReleaseResponse releaseResponse = 3; + int64 requestIdMSB = 1; + int64 requestIdLSB = 2; + JsCompileResponse compileResponse = 3; + JsInvokeResponse invokeResponse = 4; + JsReleaseResponse releaseResponse = 5; } message JsCompileRequest { diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java index af786f1da1..397226479e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.UUID; /** * Created by ashvayka on 24.09.18. @@ -34,11 +35,15 @@ public class TBKafkaConsumerTemplate { private final KafkaConsumer consumer; private final TbKafkaDecoder decoder; + + @Builder.Default + private TbKafkaRequestIdExtractor requestIdExtractor = ((response) -> null); + @Getter private final String topic; @Builder - private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, + private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder decoder, TbKafkaRequestIdExtractor requestIdExtractor, String clientId, String groupId, String topic, boolean autoCommit, int autoCommitIntervalMs) { Properties props = settings.toProps(); @@ -50,6 +55,7 @@ public class TBKafkaConsumerTemplate { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); this.consumer = new KafkaConsumer<>(props); this.decoder = decoder; + this.requestIdExtractor = requestIdExtractor; this.topic = topic; } @@ -68,4 +74,8 @@ public class TBKafkaConsumerTemplate { public T decode(ConsumerRecord record) throws IOException { return decoder.decode(record.value()); } + + public UUID extractRequestId(T value) { + return requestIdExtractor.extractRequestId(value); + } } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index b39e63e1ef..097dbb8e0c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -24,9 +24,13 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.header.Header; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.Future; +import java.util.function.BiConsumer; /** * Created by ashvayka on 24.09.18. @@ -35,13 +39,18 @@ public class TBKafkaProducerTemplate { private final KafkaProducer producer; private final TbKafkaEncoder encoder; + + @Builder.Default + private TbKafkaEnricher enricher = ((value, responseTopic, requestId) -> value); + private final TbKafkaPartitioner partitioner; private final List partitionInfoList; @Getter private final String defaultTopic; @Builder - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder encoder, TbKafkaPartitioner partitioner, String defaultTopic) { + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder encoder, TbKafkaEnricher enricher, + TbKafkaPartitioner partitioner, String defaultTopic) { Properties props = settings.toProps(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); @@ -49,10 +58,15 @@ public class TBKafkaProducerTemplate { //Maybe this should not be cached, but we don't plan to change size of partitions this.partitionInfoList = producer.partitionsFor(defaultTopic); this.encoder = encoder; + this.enricher = enricher; this.partitioner = partitioner; this.defaultTopic = defaultTopic; } + public T enrich(T value, String responseTopic, UUID requestId) { + return enricher.enrich(value, responseTopic, requestId); + } + public Future send(String key, T value) { return send(key, value, null, null); } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java new file mode 100644 index 0000000000..76a6f388bc --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.kafka; + +import java.util.UUID; + +public interface TbKafkaEnricher { + + T enrich(T value, String responseTopic, UUID requestId); + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestIdExtractor.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestIdExtractor.java new file mode 100644 index 0000000000..4948df54dc --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestIdExtractor.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.kafka; + +import java.util.UUID; + +public interface TbKafkaRequestIdExtractor { + + UUID extractRequestId(T value); + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java index a851e6bc80..38e4436a94 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java @@ -94,18 +94,34 @@ public class TbKafkaRequestTemplate { ConsumerRecords responses = responseTemplate.poll(Duration.ofMillis(pollInterval)); responses.forEach(response -> { Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); + Response decocedResponse = null; + UUID requestId = null; if (requestIdHeader == null) { - log.error("[{}] Missing requestIdHeader", response); - } - UUID requestId = bytesToUuid(requestIdHeader.value()); - ResponseMetaData expectedResponse = pendingRequests.remove(requestId); - if (expectedResponse == null) { - log.trace("[{}] Invalid or stale request", requestId); - } else { try { - expectedResponse.future.set(responseTemplate.decode(response)); + decocedResponse = responseTemplate.decode(response); + requestId = responseTemplate.extractRequestId(decocedResponse); + } catch (IOException e) { - expectedResponse.future.setException(e); + log.error("Failed to decode response", e); + } + } else { + requestId = bytesToUuid(requestIdHeader.value()); + } + if (requestId == null) { + log.error("[{}] Missing requestId in header and response", response); + } else { + ResponseMetaData expectedResponse = pendingRequests.remove(requestId); + if (expectedResponse == null) { + log.trace("[{}] Invalid or stale request", requestId); + } else { + try { + if (decocedResponse == null) { + decocedResponse = responseTemplate.decode(response); + } + expectedResponse.future.set(decocedResponse); + } catch (IOException e) { + expectedResponse.future.setException(e); + } } } }); @@ -144,6 +160,7 @@ public class TbKafkaRequestTemplate { headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()))); SettableFuture future = SettableFuture.create(); pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); + request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); requestTemplate.send(key, request, headers); return future; } diff --git a/msa/js-executor/.gitignore b/msa/js-executor/.gitignore index 19c6dc921e..1aac6855f6 100644 --- a/msa/js-executor/.gitignore +++ b/msa/js-executor/.gitignore @@ -29,4 +29,4 @@ pom.xml.versionsBackup **/.env node_modules package-lock.json -api/jsinvoke.js +api/*.proto.js diff --git a/msa/js-executor/api/Utils.js b/msa/js-executor/api/Utils.js new file mode 100644 index 0000000000..16d98a596b --- /dev/null +++ b/msa/js-executor/api/Utils.js @@ -0,0 +1,37 @@ +/* + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +const Long = require('long'); +const uuidParse = require('uuid-parse'); + +var logger = require('../config/logger')('Utils'); + +exports.toUUIDString = function(mostSigBits, leastSigBits) { + var msbBytes = Long.fromValue(mostSigBits, false).toBytes(false); + var lsbBytes = Long.fromValue(leastSigBits, false).toBytes(false); + var uuidBytes = msbBytes.concat(lsbBytes); + var buff = new Buffer(uuidBytes, 'utf8'); + return uuidParse.unparse(uuidBytes); +} + +exports.UUIDToBits = function(uuidString) { + const bytes = uuidParse.parse(uuidString); + var msb = Long.fromBytes(bytes.slice(0,8), false, false).toString(); + var lsb = Long.fromBytes(bytes.slice(-8), false, false).toString(); + return [msb, lsb]; +} diff --git a/msa/js-executor/api/jsMessageConsumer.js b/msa/js-executor/api/jsMessageConsumer.js index 774a205bc2..39dd43aa33 100644 --- a/msa/js-executor/api/jsMessageConsumer.js +++ b/msa/js-executor/api/jsMessageConsumer.js @@ -16,9 +16,10 @@ 'use strict'; -var logger = require('../config/logger')('JsMessageConsumer'); - -var js = require('./jsinvoke').js; +const logger = require('../config/logger')('JsMessageConsumer'); +const Utils = require('./Utils'); +const js = require('./jsinvoke.proto').js; +const KeyedMessage = require('kafka-node').KeyedMessage; exports.onJsInvokeMessage = function(message, producer) { @@ -29,7 +30,15 @@ exports.onJsInvokeMessage = function(message, producer) { logger.info('Received request: %s', JSON.stringify(request)); + var requestId = getRequestId(request); + + logger.info('Received request, responseTopic: [%s]; requestId: [%s]', request.responseTopic, requestId); + if (request.compileRequest) { + var scriptId = getScriptId(request.compileRequest); + + logger.info('Received compile request, scriptId: [%s]', scriptId); + var compileResponse = js.JsCompileResponse.create( { errorCode: js.JsInvokeErrorCode.COMPILATION_ERROR, @@ -39,16 +48,31 @@ exports.onJsInvokeMessage = function(message, producer) { scriptIdMSB: request.compileRequest.scriptIdMSB } ); + const requestIdBits = Utils.UUIDToBits(requestId); var response = js.RemoteJsResponse.create( { + requestIdMSB: requestIdBits[0], + requestIdLSB: requestIdBits[1], compileResponse: compileResponse } ); var rawResponse = js.RemoteJsResponse.encode(response).finish(); - sendMessage(producer, rawResponse); + sendMessage(producer, rawResponse, request.responseTopic, scriptId); } } -function sendMessage(producer, rawMessage) { - +function sendMessage(producer, rawMessage, responseTopic, scriptId) { + const message = new KeyedMessage(scriptId, rawMessage); + const payloads = [ { topic: responseTopic, messages: rawMessage, key: scriptId } ]; + producer.send(payloads, function (err, data) { + console.log(data); + }); +} + +function getScriptId(request) { + return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB); +} + +function getRequestId(request) { + return Utils.toUUIDString(request.requestIdMSB, request.requestIdLSB); } diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index 489682ed31..fc1665bcb6 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -6,7 +6,7 @@ "main": "server.js", "bin": "server.js", "scripts": { - "build-proto": "pbjs -t static-module -w commonjs -o ./api/jsinvoke.js ../../application/src/main/proto/jsinvoke.proto", + "build-proto": "pbjs -t static-module -w commonjs -o ./api/jsinvoke.proto.js ../../application/src/main/proto/jsinvoke.proto", "install": "npm run build-proto && pkg -t node8-linux-x64,node8-win-x64 --out-path ./target . && node install.js", "test": "echo \"Error: no test specified\" && exit 1", "start": "npm run build-proto && nodemon server.js", @@ -16,7 +16,9 @@ "config": "^1.30.0", "js-yaml": "^3.12.0", "kafka-node": "^3.0.1", + "long": "^4.0.0", "protobufjs": "^6.8.8", + "uuid-parse": "^1.0.0", "winston": "^3.0.0", "winston-daily-rotate-file": "^3.2.1" }, diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index b8ebec3d66..58d721b4f9 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -13,16 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -var config = require('config'), - kafka = require('kafka-node'), - Consumer = kafka.Consumer, - Producer = kafka.Producer, - JsMessageConsumer = require('./api/jsMessageConsumer'); -var logger = require('./config/logger')('main'); +const config = require('config'), + kafka = require('kafka-node'), + Consumer = kafka.Consumer, + Producer = kafka.Producer, + JsMessageConsumer = require('./api/jsMessageConsumer'); -var kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); -var kafkaRequestTopic = config.get('kafka.request_topic'); +const logger = require('./config/logger')('main'); + +const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); +const kafkaRequestTopic = config.get('kafka.request_topic'); logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); logger.info('Kafka Requests Topic: %s', kafkaRequestTopic); diff --git a/pom.xml b/pom.xml index e824177ac7..327838c5a0 100755 --- a/pom.xml +++ b/pom.xml @@ -283,6 +283,7 @@ src/main/scripts/control/** src/main/scripts/windows/** src/main/resources/public/static/rulenode/** + **/*.proto.js JAVADOC_STYLE