AI rule node: add dedicated thread pool for AI requests

This commit is contained in:
Dmytro Skarzhynets 2025-06-11 21:17:48 +03:00
parent db5e4f8d91
commit ff23fa03c0
No known key found for this signature in database
GPG Key ID: 2B51652F224037DF
8 changed files with 160 additions and 6 deletions

View File

@ -30,6 +30,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AiRequestsExecutor;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.MqttClientSettings;
@ -319,6 +320,10 @@ public class ActorSystemContext {
@Getter
private AiSettingsService aiSettingsService;
@Autowired
@Getter
private AiRequestsExecutor aiRequestsExecutor;
@Autowired
@Getter
private EntityViewService entityViewService;

View File

@ -23,6 +23,7 @@ import org.bouncycastle.util.Arrays;
import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.AiRequestsExecutor;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.MqttClientSettings;
@ -1024,6 +1025,11 @@ public class DefaultTbContext implements TbContext {
return mainCtx.getAiSettingsService();
}
@Override
public AiRequestsExecutor getAiRequestsExecutor() {
return mainCtx.getAiRequestsExecutor();
}
@Override
public MqttClientSettings getMqttClientSettings() {
return mainCtx.getMqttClientSettings();

View File

@ -0,0 +1,43 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ai;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "actors.rule.ai-requests-thread-pool")
class AiRequestsExecutorProperties {
@NotBlank(message = "Pool name must be not blank")
private String poolName = "ai-requests";
@Min(value = 1, message = "Pool size must be at least 1")
private int poolSize = 50;
@Min(value = 1, message = "Max queue size must be at least 1")
private int maxQueueSize = 10000;
@Min(value = 1, message = "Termination timeout must be at least 1 second")
private int terminationTimeoutSeconds = 60;
}

View File

@ -0,0 +1,63 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ai;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.response.ChatResponse;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.AiRequestsExecutor;
import java.time.Duration;
@Lazy
@Component
@RequiredArgsConstructor
class DefaultAiRequestsExecutor implements AiRequestsExecutor {
private final AiRequestsExecutorProperties properties;
private ListeningExecutorService executorService;
@PostConstruct
private void init() {
executorService = MoreExecutors.listeningDecorator(
ThingsBoardExecutors.newLimitedTasksExecutor(properties.getPoolSize(), properties.getMaxQueueSize(), properties.getPoolName())
);
}
@Override
public FluentFuture<ChatResponse> sendChatRequestAsync(ChatModel chatModel, ChatRequest chatRequest) {
return FluentFuture.from(executorService.submit(() -> chatModel.chat(chatRequest)));
}
@PreDestroy
private void destroy() {
if (executorService != null) {
MoreExecutors.shutdownAndAwaitTermination(executorService, Duration.ofSeconds(properties.getTerminationTimeoutSeconds()));
executorService = null;
}
}
}

View File

@ -463,6 +463,16 @@ actors:
allow_system_sms_service: "${ACTORS_RULE_ALLOW_SYSTEM_SMS_SERVICE:true}"
# Specify thread pool size for external call service
external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:50}"
# Configuration for the thread pool that executes HTTP calls to AI provider APIs
ai-requests-thread-pool:
# The base name for threads
pool-name: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_NAME:ai-requests}"
# The maximum number of concurrent HTTP requests
pool-size: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_SIZE:50}"
# The maximum queue size for pending AI requests
max-queue-size: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_QUEUE_SIZE:10000}"
# The maximum time in seconds to wait for active tasks to complete during graceful shutdown
termination-timeout-seconds: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_TERMINATION_TIMEOUT_SECONDS:60}"
chain:
# Errors for particular actors are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}"

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FluentFuture;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.response.ChatResponse;
public interface AiRequestsExecutor {
FluentFuture<ChatResponse> sendChatRequestAsync(ChatModel chatModel, ChatRequest chatRequest);
}

View File

@ -422,6 +422,8 @@ public interface TbContext {
AiSettingsService getAiSettingsService();
AiRequestsExecutor getAiRequestsExecutor();
// Configuration parameters for the MQTT client that is used in the MQTT node and Azure IoT hub node
MqttClientSettings getMqttClientSettings();

View File

@ -26,6 +26,7 @@ import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.request.ResponseFormat;
import dev.langchain4j.model.chat.request.ResponseFormatType;
import dev.langchain4j.model.chat.request.json.JsonSchema;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.input.PromptTemplate;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.thingsboard.common.util.JacksonUtil;
@ -130,10 +131,11 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode {
.build();
configureChatModelAsync(ctx)
.transform(chatModel -> sendChatRequest(chatModel, chatRequest), ctx.getExternalCallExecutor())
.transformAsync(chatModel -> ctx.getAiRequestsExecutor().sendChatRequestAsync(chatModel, chatRequest), directExecutor())
.addCallback(new FutureCallback<>() {
@Override
public void onSuccess(String response) {
public void onSuccess(ChatResponse chatResponse) {
String response = chatResponse.aiMessage().text();
if (!isValidJsonObject(response)) {
response = wrapInJsonObject(response);
}
@ -165,10 +167,6 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode {
}, ctx.getDbCallbackExecutor());
}
private String sendChatRequest(ChatModel chatModel, ChatRequest chatRequest) {
return chatModel.chat(chatRequest).aiMessage().text();
}
private static boolean isValidJsonObject(String jsonString) {
try {
JsonNode result = JacksonUtil.toJsonNode(jsonString);