From 5f40dcacdbd8c5524db2031a65d775bd3bf67de0 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Thu, 1 Nov 2018 18:46:18 +0200 Subject: [PATCH] Fix docker env files. Improve logging. --- .../service/script/RemoteJsInvokeService.java | 1 + .../server/kafka/TbKafkaRequestTemplate.java | 9 ++++++++- docker/.env | 2 -- docker/kafka.env | 2 +- .../server/msa/ThingsBoardDbInstaller.java | 14 +++++++++----- msa/js-executor/server.js | 18 ++++++++++++++++++ 6 files changed, 37 insertions(+), 9 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index db31bda942..05ed2853b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -136,6 +136,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setCompileRequest(jsRequest) .build(); + log.trace("Post compile request for scriptId [{}]", scriptId); ListenableFuture future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); return Futures.transform(future, response -> { JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java index 77ad0331ab..b0aca0f54d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaRequestTemplate.java @@ -92,6 +92,9 @@ public class TbKafkaRequestTemplate extends AbstractTbKafkaTe long nextCleanupMs = 0L; while (!stopped) { ConsumerRecords responses = responseTemplate.poll(Duration.ofMillis(pollInterval)); + if (responses.count() > 0) { + log.trace("Polling responses completed, consumer records count [{}]", responses.count()); + } responses.forEach(response -> { Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); Response decodedResponse = null; @@ -109,6 +112,7 @@ public class TbKafkaRequestTemplate extends AbstractTbKafkaTe if (requestId == null) { log.error("[{}] Missing requestId in header and body", response); } else { + log.trace("[{}] Response received", requestId); ResponseMetaData expectedResponse = pendingRequests.remove(requestId); if (expectedResponse == null) { log.trace("[{}] Invalid or stale request", requestId); @@ -132,6 +136,7 @@ public class TbKafkaRequestTemplate extends AbstractTbKafkaTe if (kv.getValue().expTime < tickTs) { ResponseMetaData staleRequest = pendingRequests.remove(kv.getKey()); if (staleRequest != null) { + log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs); staleRequest.future.setException(new TimeoutException()); } } @@ -158,8 +163,10 @@ public class TbKafkaRequestTemplate extends AbstractTbKafkaTe headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))); headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()))); SettableFuture future = SettableFuture.create(); - pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); + ResponseMetaData responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future); + pendingRequests.putIfAbsent(requestId, responseMetaData); request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); + log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime); requestTemplate.send(key, request, headers, null); return future; } diff --git a/docker/.env b/docker/.env index 0138501ebc..ef0bf40316 100644 --- a/docker/.env +++ b/docker/.env @@ -16,5 +16,3 @@ TB_VERSION=latest DATABASE=postgres LOAD_BALANCER_NAME=haproxy-certbot - -KAFKA_TOPICS="js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1" diff --git a/docker/kafka.env b/docker/kafka.env index 69fbdf6116..87dad076d2 100644 --- a/docker/kafka.env +++ b/docker/kafka.env @@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092 KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE -KAFKA_CREATE_TOPICS=${KAFKA_TOPICS} +KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1 KAFKA_AUTO_CREATE_TOPICS_ENABLE=false KAFKA_LOG_RETENTION_BYTES=1073741824 KAFKA_LOG_SEGMENT_BYTES=268435456 diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java index 0a902b9c5e..fef69b5163 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/ThingsBoardDbInstaller.java @@ -87,10 +87,17 @@ public class ThingsBoardDbInstaller extends ExternalResource { @Override protected void after() { - File tbLogsDir = new File("./target/tb-logs/"); + copyLogs(tbLogVolume, "./target/tb-logs/"); + + dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume); + dockerCompose.invokeDocker(); + } + + private void copyLogs(String volumeName, String targetDir) { + File tbLogsDir = new File(targetDir); tbLogsDir.mkdirs(); - dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + tbLogVolume + ":/root alpine tail -f /dev/null"); + dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + volumeName + ":/root alpine tail -f /dev/null"); dockerCompose.invokeDocker(); dockerCompose.withCommand("cp tb-logs-container:/root/. "+tbLogsDir.getAbsolutePath()); @@ -98,9 +105,6 @@ public class ThingsBoardDbInstaller extends ExternalResource { dockerCompose.withCommand("rm -f tb-logs-container"); dockerCompose.invokeDocker(); - - dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume); - dockerCompose.invokeDocker(); } } diff --git a/msa/js-executor/server.js b/msa/js-executor/server.js index 03fac2ebe0..17f70cbf9f 100644 --- a/msa/js-executor/server.js +++ b/msa/js-executor/server.js @@ -45,6 +45,24 @@ var kafkaClient; kafkaRequestTopic ); + consumer.on('error', (err) => { + logger.error('Unexpected kafka consumer error: %s', err.message); + logger.error(err.stack); + }); + + consumer.on('offsetOutOfRange', (err) => { + logger.error('Offset out of range error: %s', err.message); + logger.error(err.stack); + }); + + consumer.on('rebalancing', () => { + logger.info('Rebalancing event received.'); + }) + + consumer.on('rebalanced', () => { + logger.info('Rebalanced event received.'); + }); + var producer = new Producer(kafkaClient); producer.on('error', (err) => { logger.error('Unexpected kafka producer error: %s', err.message);