Improve Remote JS Invoke Service statistics

This commit is contained in:
Igor Kulikov 2019-12-03 16:01:10 +02:00
parent 562917649c
commit 3c3ad1ac38

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.script; package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.Getter; import lombok.Getter;
@ -31,11 +32,13 @@ import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.kafka.TbNodeIdProvider; import org.thingsboard.server.kafka.TbNodeIdProvider;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -79,6 +82,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0); private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0);
private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0); private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0);
private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0);
private final AtomicInteger kafkaTimeoutMsgs = new AtomicInteger(0);
@Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}")
public void printStats() { public void printStats() {
@ -86,8 +90,9 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
int invokeMsgs = kafkaInvokeMsgs.getAndSet(0); int invokeMsgs = kafkaInvokeMsgs.getAndSet(0);
int evalMsgs = kafkaEvalMsgs.getAndSet(0); int evalMsgs = kafkaEvalMsgs.getAndSet(0);
int failed = kafkaFailedMsgs.getAndSet(0); int failed = kafkaFailedMsgs.getAndSet(0);
log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}]", int timedOut = kafkaTimeoutMsgs.getAndSet(0);
kafkaPushedMsgs.getAndSet(0), invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed); log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]",
kafkaPushedMsgs.getAndSet(0), invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed, timedOut);
} }
} }
@ -145,16 +150,28 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
log.trace("Post compile request for scriptId [{}]", scriptId); log.trace("Post compile request for scriptId [{}]", scriptId);
ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
kafkaPushedMsgs.incrementAndGet(); kafkaPushedMsgs.incrementAndGet();
Futures.addCallback(future, new FutureCallback<JsInvokeProtos.RemoteJsResponse>() {
@Override
public void onSuccess(@Nullable JsInvokeProtos.RemoteJsResponse result) {
kafkaEvalMsgs.incrementAndGet();
}
@Override
public void onFailure(Throwable t) {
if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
kafkaTimeoutMsgs.incrementAndGet();
}
kafkaFailedMsgs.incrementAndGet();
}
});
return Futures.transform(future, response -> { return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
if (compilationResult.getSuccess()) { if (compilationResult.getSuccess()) {
kafkaEvalMsgs.incrementAndGet();
scriptIdToNameMap.put(scriptId, functionName); scriptIdToNameMap.put(scriptId, functionName);
scriptIdToBodysMap.put(scriptId, scriptBody); scriptIdToBodysMap.put(scriptId, scriptBody);
return compiledScriptId; return compiledScriptId;
} else { } else {
kafkaFailedMsgs.incrementAndGet();
log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
throw new RuntimeException(compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails());
} }
@ -182,16 +199,27 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
.setInvokeRequest(jsRequestBuilder.build()) .setInvokeRequest(jsRequestBuilder.build())
.build(); .build();
ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); ListenableFuture<JsInvokeProtos.RemoteJsResponse> future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper);
kafkaPushedMsgs.incrementAndGet(); kafkaPushedMsgs.incrementAndGet();
Futures.addCallback(future, new FutureCallback<JsInvokeProtos.RemoteJsResponse>() {
@Override
public void onSuccess(@Nullable JsInvokeProtos.RemoteJsResponse result) {
kafkaInvokeMsgs.incrementAndGet();
}
@Override
public void onFailure(Throwable t) {
if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
kafkaTimeoutMsgs.incrementAndGet();
}
kafkaFailedMsgs.incrementAndGet();
}
});
return Futures.transform(future, response -> { return Futures.transform(future, response -> {
JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse(); JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse();
if (invokeResult.getSuccess()) { if (invokeResult.getSuccess()) {
kafkaInvokeMsgs.incrementAndGet();
return invokeResult.getResult(); return invokeResult.getResult();
} else { } else {
kafkaFailedMsgs.incrementAndGet();
log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
throw new RuntimeException(invokeResult.getErrorDetails()); throw new RuntimeException(invokeResult.getErrorDetails());
} }