From 3c3ad1ac383ef9b7dd2eea78ebeaf17c3836e455 Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Tue, 3 Dec 2019 16:01:10 +0200 Subject: [PATCH] Improve Remote JS Invoke Service statistics --- .../service/script/RemoteJsInvokeService.java | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 4ba4efb4b3..b913889a24 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.script; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.Getter; @@ -31,11 +32,13 @@ import org.thingsboard.server.kafka.TbKafkaRequestTemplate; import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbNodeIdProvider; +import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -79,6 +82,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0); private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0); private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); + private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0); @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") public void printStats() { @@ -86,8 +90,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { int invokeMsgs = kafkaInvokeMsgs.getAndSet(0); int evalMsgs = kafkaEvalMsgs.getAndSet(0); int failed = kafkaFailedMsgs.getAndSet(0); - log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}]", - kafkaPushedMsgs.getAndSet(0), invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed); + int timedOut = kafkaTimeoutMsgs.getAndSet(0); + log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]", + kafkaPushedMsgs.getAndSet(0), invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed, timedOut); } } @@ -145,16 +150,28 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { log.trace("Post compile request for scriptId [{}]", scriptId); ListenableFuture future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); kafkaPushedMsgs.incrementAndGet(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable JsInvokeProtos.RemoteJsResponse result) { + kafkaEvalMsgs.incrementAndGet(); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) { + kafkaTimeoutMsgs.incrementAndGet(); + } + kafkaFailedMsgs.incrementAndGet(); + } + }); return Futures.transform(future, response -> { JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); if (compilationResult.getSuccess()) { - kafkaEvalMsgs.incrementAndGet(); scriptIdToNameMap.put(scriptId, functionName); scriptIdToBodysMap.put(scriptId, scriptBody); return compiledScriptId; } else { - kafkaFailedMsgs.incrementAndGet(); log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails()); } @@ -182,16 +199,27 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setInvokeRequest(jsRequestBuilder.build()) .build(); - ListenableFuture future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); kafkaPushedMsgs.incrementAndGet(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable JsInvokeProtos.RemoteJsResponse result) { + kafkaInvokeMsgs.incrementAndGet(); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) { + kafkaTimeoutMsgs.incrementAndGet(); + } + kafkaFailedMsgs.incrementAndGet(); + } + }); return Futures.transform(future, response -> { JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse(); if (invokeResult.getSuccess()) { - kafkaInvokeMsgs.incrementAndGet(); return invokeResult.getResult(); } else { - kafkaFailedMsgs.incrementAndGet(); log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); throw new RuntimeException(invokeResult.getErrorDetails()); }