diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 5eb339ae02..4ba4efb4b3 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -101,22 +101,6 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { requestBuilder.clientId("producer-js-invoke-" + nodeIdProvider.getNodeId()); requestBuilder.defaultTopic(requestTopic); requestBuilder.encoder(new RemoteJsRequestEncoder()); - requestBuilder.enricher((request, responseTopic, requestId) -> { - JsInvokeProtos.RemoteJsRequest.Builder remoteRequest = JsInvokeProtos.RemoteJsRequest.newBuilder(); - if (request.hasCompileRequest()) { - remoteRequest.setCompileRequest(request.getCompileRequest()); - } - if (request.hasInvokeRequest()) { - remoteRequest.setInvokeRequest(request.getInvokeRequest()); - } - if (request.hasReleaseRequest()) { - remoteRequest.setReleaseRequest(request.getReleaseRequest()); - } - remoteRequest.setResponseTopic(responseTopic); - remoteRequest.setRequestIdMSB(requestId.getMostSignificantBits()); - remoteRequest.setRequestIdLSB(requestId.getLeastSignificantBits()); - return remoteRequest.build(); - }); TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder(); responseBuilder.settings(kafkaSettings); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java index db1a75add5..d42d221d24 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsRequestEncoder.java @@ -15,15 +15,23 @@ */ package org.thingsboard.server.service.script; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.kafka.TbKafkaEncoder; +import java.nio.charset.StandardCharsets; + /** * Created by ashvayka on 25.09.18. */ public class RemoteJsRequestEncoder implements TbKafkaEncoder { @Override public byte[] encode(JsInvokeProtos.RemoteJsRequest value) { - return value.toByteArray(); + try { + return JsonFormat.printer().print(value).getBytes(StandardCharsets.UTF_8); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java index 0ac7e6f647..8407ceaaa1 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsResponseDecoder.java @@ -15,10 +15,12 @@ */ package org.thingsboard.server.service.script; +import com.google.protobuf.util.JsonFormat; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.kafka.TbKafkaDecoder; import java.io.IOException; +import java.nio.charset.StandardCharsets; /** * Created by ashvayka on 25.09.18. @@ -27,6 +29,8 @@ public class RemoteJsResponseDecoder implements TbKafkaDecoder { private final KafkaProducer producer; private final TbKafkaEncoder encoder; - @Builder.Default - private TbKafkaEnricher enricher = ((value, responseTopic, requestId) -> value); - private final TbKafkaPartitioner partitioner; private ConcurrentMap> partitionInfoMap; @Getter @@ -61,7 +58,7 @@ public class TBKafkaProducerTemplate { private final TbKafkaSettings settings; @Builder - private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder encoder, TbKafkaEnricher enricher, + private TBKafkaProducerTemplate(TbKafkaSettings settings, TbKafkaEncoder encoder, TbKafkaPartitioner partitioner, String defaultTopic, String clientId) { Properties props = settings.toProps(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); @@ -72,7 +69,6 @@ public class TBKafkaProducerTemplate { this.settings = settings; this.producer = new KafkaProducer<>(props); this.encoder = encoder; - this.enricher = enricher; this.partitioner = partitioner; this.defaultTopic = defaultTopic; } @@ -93,14 +89,6 @@ public class TBKafkaProducerTemplate { } } - T enrich(T value, String responseTopic, UUID requestId) { - if (enricher != null) { - return enricher.enrich(value, responseTopic, requestId); - } else { - return value; - } - } - public Future send(String key, T value, Callback callback) { return send(key, value, null, callback); } diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java deleted file mode 100644 index b9226bbd38..0000000000 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaEnricher.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright © 2016-2019 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.kafka; - -import java.util.UUID; - -public interface TbKafkaEnricher { - - T enrich(T value, String responseTopic, UUID requestId); - -} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java index 28c1e2cba2..461866e406 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java @@ -144,11 +144,11 @@ public class TbKafkaRequestTemplate extends AbstractTbKafkaTe tickSize = pendingRequests.size(); if (nextCleanupMs < tickTs) { //cleanup; - pendingRequests.entrySet().forEach(kv -> { - if (kv.getValue().expTime < tickTs) { - ResponseMetaData staleRequest = pendingRequests.remove(kv.getKey()); + pendingRequests.forEach((key, value) -> { + if (value.expTime < tickTs) { + ResponseMetaData staleRequest = pendingRequests.remove(key); if (staleRequest != null) { - log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs); + log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs); staleRequest.future.setException(new TimeoutException()); } } @@ -189,13 +189,12 @@ public class TbKafkaRequestTemplate extends AbstractTbKafkaTe SettableFuture future = SettableFuture.create(); ResponseMetaData responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); pendingRequests.putIfAbsent(requestId, responseMetaData); - request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); requestTemplate.send(key, request, headers, (metadata, exception) -> { if (exception != null) { log.trace("[{}] Failed to post the request", requestId, exception); } else { - log.trace("[{}] Posted the request", requestId, metadata); + log.trace("[{}] Posted the request: {}", requestId, metadata); } }); return future; diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java index 24d53a846e..bef8244534 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaSettings.java @@ -32,9 +32,8 @@ import java.util.Properties; @Component public class TbKafkaSettings { - public static final String REQUEST_ID_HEADER = "requestId"; - public static final String RESPONSE_TOPIC_HEADER = "responseTopic"; - + static final String REQUEST_ID_HEADER = "requestId"; + static final String RESPONSE_TOPIC_HEADER = "responseTopic"; @Value("${kafka.bootstrap.servers}") private String servers;