Make remote js processing async

This commit is contained in:
Volodymyr Babak 2021-04-29 17:29:58 +03:00 committed by Andrew Shvayka
parent 4d2a9ea1a7
commit dac9dda9b6
2 changed files with 12 additions and 4 deletions

View File

@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg; import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
@ -39,6 +40,8 @@ import javax.annotation.PreDestroy;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -69,6 +72,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
private final AtomicInteger queueEvalMsgs = new AtomicInteger(0); private final AtomicInteger queueEvalMsgs = new AtomicInteger(0);
private final AtomicInteger queueFailedMsgs = new AtomicInteger(0); private final AtomicInteger queueFailedMsgs = new AtomicInteger(0);
private final AtomicInteger queueTimeoutMsgs = new AtomicInteger(0); private final AtomicInteger queueTimeoutMsgs = new AtomicInteger(0);
private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("js-executor-remote-callback"));
public RemoteJsInvokeService(TbApiUsageStateService apiUsageStateService, TbApiUsageClient apiUsageClient) { public RemoteJsInvokeService(TbApiUsageStateService apiUsageStateService, TbApiUsageClient apiUsageClient) {
super(apiUsageStateService, apiUsageClient); super(apiUsageStateService, apiUsageClient);
@ -139,7 +144,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
} }
queueFailedMsgs.incrementAndGet(); queueFailedMsgs.incrementAndGet();
} }
}, MoreExecutors.directExecutor()); }, callbackExecutor);
return Futures.transform(future, response -> { return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse(); JsInvokeProtos.JsCompileResponse compilationResult = response.getValue().getCompileResponse();
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
@ -151,7 +156,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
throw new RuntimeException(compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails());
} }
}, MoreExecutors.directExecutor()); }, callbackExecutor);
} }
@Override @Override
@ -194,7 +199,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
} }
queueFailedMsgs.incrementAndGet(); queueFailedMsgs.incrementAndGet();
} }
}, MoreExecutors.directExecutor()); }, callbackExecutor);
return Futures.transform(future, response -> { return Futures.transform(future, response -> {
JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse(); JsInvokeProtos.JsInvokeResponse invokeResult = response.getValue().getInvokeResponse();
if (invokeResult.getSuccess()) { if (invokeResult.getSuccess()) {
@ -204,7 +209,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
throw new RuntimeException(invokeResult.getErrorDetails()); throw new RuntimeException(invokeResult.getErrorDetails());
} }
}, MoreExecutors.directExecutor()); }, callbackExecutor);
} }
@Override @Override

View File

@ -19,6 +19,9 @@ server:
address: "${HTTP_BIND_ADDRESS:0.0.0.0}" address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
# Server bind port # Server bind port
port: "${HTTP_BIND_PORT:8080}" port: "${HTTP_BIND_PORT:8080}"
tomcat:
# Maximum size of data that could be send over HTTP form POST request
max-http-form-post-size: "${MAX_HTTP_FORM_POST_SIZE:10000000}" # 10Mb
# Server SSL configuration # Server SSL configuration
ssl: ssl:
# Enable/disable SSL support # Enable/disable SSL support