From ff23fa03c0e9c257a3219997d2662194a7aac532 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Wed, 11 Jun 2025 21:17:48 +0300 Subject: [PATCH] AI rule node: add dedicated thread pool for AI requests --- .../server/actors/ActorSystemContext.java | 5 ++ .../actors/ruleChain/DefaultTbContext.java | 6 ++ .../ai/AiRequestsExecutorProperties.java | 43 +++++++++++++ .../service/ai/DefaultAiRequestsExecutor.java | 63 +++++++++++++++++++ .../src/main/resources/thingsboard.yml | 10 +++ .../rule/engine/api/AiRequestsExecutor.java | 27 ++++++++ .../rule/engine/api/TbContext.java | 2 + .../thingsboard/rule/engine/ai/TbAiNode.java | 10 ++- 8 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/ai/AiRequestsExecutorProperties.java create mode 100644 application/src/main/java/org/thingsboard/server/service/ai/DefaultAiRequestsExecutor.java create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AiRequestsExecutor.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index e420d6a6f6..5fc7247061 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -30,6 +30,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AiRequestsExecutor; import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MqttClientSettings; @@ -319,6 +320,10 @@ public class ActorSystemContext { @Getter private AiSettingsService aiSettingsService; + @Autowired + @Getter + private AiRequestsExecutor aiRequestsExecutor; + @Autowired @Getter private EntityViewService entityViewService; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3aba1f5379..9df08b7b87 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -23,6 +23,7 @@ import org.bouncycastle.util.Arrays; import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.api.AiRequestsExecutor; import org.thingsboard.rule.engine.api.DeviceStateManager; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MqttClientSettings; @@ -1024,6 +1025,11 @@ public class DefaultTbContext implements TbContext { return mainCtx.getAiSettingsService(); } + @Override + public AiRequestsExecutor getAiRequestsExecutor() { + return mainCtx.getAiRequestsExecutor(); + } + @Override public MqttClientSettings getMqttClientSettings() { return mainCtx.getMqttClientSettings(); diff --git a/application/src/main/java/org/thingsboard/server/service/ai/AiRequestsExecutorProperties.java b/application/src/main/java/org/thingsboard/server/service/ai/AiRequestsExecutorProperties.java new file mode 100644 index 0000000000..52a0d0d57e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ai/AiRequestsExecutorProperties.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ai; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.validation.annotation.Validated; + +@Data +@Validated +@Configuration +@ConfigurationProperties(prefix = "actors.rule.ai-requests-thread-pool") +class AiRequestsExecutorProperties { + + @NotBlank(message = "Pool name must be not blank") + private String poolName = "ai-requests"; + + @Min(value = 1, message = "Pool size must be at least 1") + private int poolSize = 50; + + @Min(value = 1, message = "Max queue size must be at least 1") + private int maxQueueSize = 10000; + + @Min(value = 1, message = "Termination timeout must be at least 1 second") + private int terminationTimeoutSeconds = 60; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/ai/DefaultAiRequestsExecutor.java b/application/src/main/java/org/thingsboard/server/service/ai/DefaultAiRequestsExecutor.java new file mode 100644 index 0000000000..40a5bdbb63 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ai/DefaultAiRequestsExecutor.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ai; + +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import dev.langchain4j.model.chat.ChatModel; +import dev.langchain4j.model.chat.request.ChatRequest; +import dev.langchain4j.model.chat.response.ChatResponse; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.rule.engine.api.AiRequestsExecutor; + +import java.time.Duration; + +@Lazy +@Component +@RequiredArgsConstructor +class DefaultAiRequestsExecutor implements AiRequestsExecutor { + + private final AiRequestsExecutorProperties properties; + + private ListeningExecutorService executorService; + + @PostConstruct + private void init() { + executorService = MoreExecutors.listeningDecorator( + ThingsBoardExecutors.newLimitedTasksExecutor(properties.getPoolSize(), properties.getMaxQueueSize(), properties.getPoolName()) + ); + } + + @Override + public FluentFuture sendChatRequestAsync(ChatModel chatModel, ChatRequest chatRequest) { + return FluentFuture.from(executorService.submit(() -> chatModel.chat(chatRequest))); + } + + @PreDestroy + private void destroy() { + if (executorService != null) { + MoreExecutors.shutdownAndAwaitTermination(executorService, Duration.ofSeconds(properties.getTerminationTimeoutSeconds())); + executorService = null; + } + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 54c1159003..dd41b13de0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -463,6 +463,16 @@ actors: allow_system_sms_service: "${ACTORS_RULE_ALLOW_SYSTEM_SMS_SERVICE:true}" # Specify thread pool size for external call service external_call_thread_pool_size: "${ACTORS_RULE_EXTERNAL_CALL_THREAD_POOL_SIZE:50}" + # Configuration for the thread pool that executes HTTP calls to AI provider APIs + ai-requests-thread-pool: + # The base name for threads + pool-name: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_NAME:ai-requests}" + # The maximum number of concurrent HTTP requests + pool-size: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_SIZE:50}" + # The maximum queue size for pending AI requests + max-queue-size: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_QUEUE_SIZE:10000}" + # The maximum time in seconds to wait for active tasks to complete during graceful shutdown + termination-timeout-seconds: "${ACTORS_RULE_AI_REQUESTS_THREAD_POOL_TERMINATION_TIMEOUT_SECONDS:60}" chain: # Errors for particular actors are persisted once per specified amount of milliseconds error_persist_frequency: "${ACTORS_RULE_CHAIN_ERROR_FREQUENCY:3000}" diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AiRequestsExecutor.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AiRequestsExecutor.java new file mode 100644 index 0000000000..9fd2a000ce --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AiRequestsExecutor.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.FluentFuture; +import dev.langchain4j.model.chat.ChatModel; +import dev.langchain4j.model.chat.request.ChatRequest; +import dev.langchain4j.model.chat.response.ChatResponse; + +public interface AiRequestsExecutor { + + FluentFuture sendChatRequestAsync(ChatModel chatModel, ChatRequest chatRequest); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 4f9e786357..e5306c0193 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -422,6 +422,8 @@ public interface TbContext { AiSettingsService getAiSettingsService(); + AiRequestsExecutor getAiRequestsExecutor(); + // Configuration parameters for the MQTT client that is used in the MQTT node and Azure IoT hub node MqttClientSettings getMqttClientSettings(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java index def5d03477..955442e79b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java @@ -26,6 +26,7 @@ import dev.langchain4j.model.chat.request.ChatRequest; import dev.langchain4j.model.chat.request.ResponseFormat; import dev.langchain4j.model.chat.request.ResponseFormatType; import dev.langchain4j.model.chat.request.json.JsonSchema; +import dev.langchain4j.model.chat.response.ChatResponse; import dev.langchain4j.model.input.PromptTemplate; import org.checkerframework.checker.nullness.qual.NonNull; import org.thingsboard.common.util.JacksonUtil; @@ -130,10 +131,11 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode { .build(); configureChatModelAsync(ctx) - .transform(chatModel -> sendChatRequest(chatModel, chatRequest), ctx.getExternalCallExecutor()) + .transformAsync(chatModel -> ctx.getAiRequestsExecutor().sendChatRequestAsync(chatModel, chatRequest), directExecutor()) .addCallback(new FutureCallback<>() { @Override - public void onSuccess(String response) { + public void onSuccess(ChatResponse chatResponse) { + String response = chatResponse.aiMessage().text(); if (!isValidJsonObject(response)) { response = wrapInJsonObject(response); } @@ -165,10 +167,6 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode { }, ctx.getDbCallbackExecutor()); } - private String sendChatRequest(ChatModel chatModel, ChatRequest chatRequest) { - return chatModel.chat(chatRequest).aiMessage().text(); - } - private static boolean isValidJsonObject(String jsonString) { try { JsonNode result = JacksonUtil.toJsonNode(jsonString);