Fix docker env files. Improve logging.

This commit is contained in:
Igor Kulikov 2018-11-01 18:46:18 +02:00
parent 62db6001fd
commit 5f40dcacdb
6 changed files with 37 additions and 9 deletions

View File

@ -136,6 +136,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setCompileRequest(jsRequest) .setCompileRequest(jsRequest)
.build(); .build();
log.trace("Post compile request for scriptId [{}]", scriptId);
ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
return Futures.transform(future, response -> { return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();

View File

@ -92,6 +92,9 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
long nextCleanupMs = 0L; long nextCleanupMs = 0L;
while (!stopped) { while (!stopped) {
ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval)); ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
if (responses.count() > 0) {
log.trace("Polling responses completed, consumer records count [{}]", responses.count());
}
responses.forEach(response -> { responses.forEach(response -> {
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
Response decodedResponse = null; Response decodedResponse = null;
@ -109,6 +112,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
if (requestId == null) { if (requestId == null) {
log.error("[{}] Missing requestId in header and body", response); log.error("[{}] Missing requestId in header and body", response);
} else { } else {
log.trace("[{}] Response received", requestId);
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId); ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) { if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId); log.trace("[{}] Invalid or stale request", requestId);
@ -132,6 +136,7 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
if (kv.getValue().expTime < tickTs) { if (kv.getValue().expTime < tickTs) {
ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey()); ResponseMetaData<Response> staleRequest = pendingRequests.remove(kv.getKey());
if (staleRequest != null) { if (staleRequest != null) {
log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", kv.getKey(), staleRequest.expTime, tickTs);
staleRequest.future.setException(new TimeoutException()); staleRequest.future.setException(new TimeoutException());
} }
} }
@ -158,8 +163,10 @@ public class TbKafkaRequestTemplate<Request, Response> extends AbstractTbKafkaTe
headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))); headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()))); headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
SettableFuture<Response> future = SettableFuture.create(); SettableFuture<Response> future = SettableFuture.create();
pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future)); ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
pendingRequests.putIfAbsent(requestId, responseMetaData);
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId); request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, key, responseMetaData.expTime);
requestTemplate.send(key, request, headers, null); requestTemplate.send(key, request, headers, null);
return future; return future;
} }

View File

@ -16,5 +16,3 @@ TB_VERSION=latest
DATABASE=postgres DATABASE=postgres
LOAD_BALANCER_NAME=haproxy-certbot LOAD_BALANCER_NAME=haproxy-certbot
KAFKA_TOPICS="js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1"

View File

@ -4,7 +4,7 @@ KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092
KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092 KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
KAFKA_CREATE_TOPICS=${KAFKA_TOPICS} KAFKA_CREATE_TOPICS=js.eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.transport.api.requests:30:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb.rule-engine:30:1
KAFKA_AUTO_CREATE_TOPICS_ENABLE=false KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_LOG_RETENTION_BYTES=1073741824 KAFKA_LOG_RETENTION_BYTES=1073741824
KAFKA_LOG_SEGMENT_BYTES=268435456 KAFKA_LOG_SEGMENT_BYTES=268435456

View File

@ -87,10 +87,17 @@ public class ThingsBoardDbInstaller extends ExternalResource {
@Override @Override
protected void after() { protected void after() {
File tbLogsDir = new File("./target/tb-logs/"); copyLogs(tbLogVolume, "./target/tb-logs/");
dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume);
dockerCompose.invokeDocker();
}
private void copyLogs(String volumeName, String targetDir) {
File tbLogsDir = new File(targetDir);
tbLogsDir.mkdirs(); tbLogsDir.mkdirs();
dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + tbLogVolume + ":/root alpine tail -f /dev/null"); dockerCompose.withCommand("run -d --rm --name tb-logs-container -v " + volumeName + ":/root alpine tail -f /dev/null");
dockerCompose.invokeDocker(); dockerCompose.invokeDocker();
dockerCompose.withCommand("cp tb-logs-container:/root/. "+tbLogsDir.getAbsolutePath()); dockerCompose.withCommand("cp tb-logs-container:/root/. "+tbLogsDir.getAbsolutePath());
@ -98,9 +105,6 @@ public class ThingsBoardDbInstaller extends ExternalResource {
dockerCompose.withCommand("rm -f tb-logs-container"); dockerCompose.withCommand("rm -f tb-logs-container");
dockerCompose.invokeDocker(); dockerCompose.invokeDocker();
dockerCompose.withCommand("volume rm -f " + postgresDataVolume + " " + tbLogVolume);
dockerCompose.invokeDocker();
} }
} }

View File

@ -45,6 +45,24 @@ var kafkaClient;
kafkaRequestTopic kafkaRequestTopic
); );
consumer.on('error', (err) => {
logger.error('Unexpected kafka consumer error: %s', err.message);
logger.error(err.stack);
});
consumer.on('offsetOutOfRange', (err) => {
logger.error('Offset out of range error: %s', err.message);
logger.error(err.stack);
});
consumer.on('rebalancing', () => {
logger.info('Rebalancing event received.');
})
consumer.on('rebalanced', () => {
logger.info('Rebalanced event received.');
});
var producer = new Producer(kafkaClient); var producer = new Producer(kafkaClient);
producer.on('error', (err) => { producer.on('error', (err) => {
logger.error('Unexpected kafka producer error: %s', err.message); logger.error('Unexpected kafka producer error: %s', err.message);