AI rule node: add API for sending arbitrary chat requests to LLM model

This commit is contained in:
Dmytro Skarzhynets 2025-07-02 19:46:02 +03:00
parent 944d80df9d
commit a2e7ff293b
No known key found for this signature in database
GPG Key ID: 2B51652F224037DF
14 changed files with 367 additions and 43 deletions

View File

@ -30,7 +30,6 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AiRequestsExecutor;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.MailService;
@ -322,10 +321,6 @@ public class ActorSystemContext {
@Getter
private AiModelSettingsService aiModelSettingsService;
@Autowired
@Getter
private AiRequestsExecutor aiRequestsExecutor;
@Autowired
@Getter
private EntityViewService entityViewService;

View File

@ -23,7 +23,6 @@ import org.bouncycastle.util.Arrays;
import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.AiRequestsExecutor;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.JobManager;
import org.thingsboard.rule.engine.api.MailService;
@ -1037,11 +1036,6 @@ public class DefaultTbContext implements TbContext {
return mainCtx.getAiModelSettingsService();
}
@Override
public AiRequestsExecutor getAiRequestsExecutor() {
return mainCtx.getAiRequestsExecutor();
}
@Override
public MqttClientSettings getMqttClientSettings() {
return mainCtx.getMqttClientSettings();

View File

@ -0,0 +1,63 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.google.common.util.concurrent.ListenableFuture;
import dev.langchain4j.model.chat.request.ChatRequest;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.ai.dto.TbChatRequest;
import org.thingsboard.server.common.data.ai.dto.TbChatResponse;
import org.thingsboard.server.common.data.ai.model.chat.AiChatModel;
import org.thingsboard.server.config.annotations.ApiOperation;
import org.thingsboard.server.service.ai.AiModelService;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH;
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/ai/model")
class AiModelController extends BaseController {
private final AiModelService aiModelService;
@ApiOperation(
value = "Send request to AI chat model (sendChatRequest)",
notes = "Submits a single prompt - made up of an optional system message and a required user message - to the specified AI chat model " +
"and returns either the generated answer or an error envelope." +
TENANT_AUTHORITY_PARAGRAPH
)
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@PostMapping("/chat")
public DeferredResult<TbChatResponse> sendChatRequest(@Valid @RequestBody TbChatRequest tbChatRequest) {
ChatRequest langChainChatRequest = tbChatRequest.toLangChainChatRequest();
AiChatModel<?> chatModel = tbChatRequest.chatModel();
ListenableFuture<TbChatResponse> future = aiModelService.sendChatRequestAsync(chatModel, langChainChatRequest)
.transform(chatResponse -> (TbChatResponse) new TbChatResponse.Success(chatResponse.aiMessage().text()), directExecutor())
.catching(Throwable.class, ex -> new TbChatResponse.Failure(ex.getMessage()), directExecutor());
return wrapFuture(future);
}
}

View File

@ -0,0 +1,20 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ai;
import org.thingsboard.rule.engine.api.RuleEngineAiModelService;
public interface AiModelService extends RuleEngineAiModelService {}

View File

@ -15,23 +15,27 @@
*/
package org.thingsboard.server.service.ai;
import com.google.common.util.concurrent.FluentFuture;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.response.ChatResponse;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleEngineAiModelService;
import org.thingsboard.server.common.data.ai.model.chat.AiChatModel;
import org.thingsboard.server.common.data.ai.model.chat.AiChatModelConfig;
import org.thingsboard.server.common.data.ai.model.chat.Langchain4jChatModelConfigurer;
@Service
@RequiredArgsConstructor
class AiModelServiceImpl implements RuleEngineAiModelService {
class AiModelServiceImpl implements AiModelService {
private final Langchain4jChatModelConfigurer chatModelConfigurer;
private final AiRequestsExecutor aiRequestsExecutor;
@Override
public <C extends AiChatModelConfig<C>> ChatModel configureChatModel(AiChatModel<C> chatModel) {
return chatModel.configure(chatModelConfigurer);
public <C extends AiChatModelConfig<C>> FluentFuture<ChatResponse> sendChatRequestAsync(AiChatModel<C> chatModel, ChatRequest chatRequest) {
ChatModel lc4jChatModel = chatModel.configure(chatModelConfigurer);
return aiRequestsExecutor.sendChatRequestAsync(lc4jChatModel, chatRequest);
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
package org.thingsboard.server.service.ai;
import com.google.common.util.concurrent.FluentFuture;
import dev.langchain4j.model.chat.ChatModel;

View File

@ -33,7 +33,6 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AiRequestsExecutor;
import java.time.Duration;
import java.util.concurrent.Executors;

View File

@ -0,0 +1,79 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.ai.dto;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.Content;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.chat.request.ChatRequest;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import org.thingsboard.server.common.data.ai.model.chat.AiChatModel;
import java.util.ArrayList;
import java.util.List;
public record TbChatRequest(
@Schema(
requiredMode = Schema.RequiredMode.NOT_REQUIRED,
accessMode = Schema.AccessMode.READ_WRITE,
description = "A system-level instruction that frames the user's input, setting the persona, tone, and constraints for the generated response",
example = "You are a helpful assistant. Only output valid JSON."
)
String systemMessage,
@Schema(
requiredMode = Schema.RequiredMode.REQUIRED,
accessMode = Schema.AccessMode.READ_WRITE,
description = "The actual user prompt that will be answered by the AI model"
)
@NotNull @Valid
TbUserMessage userMessage,
@Schema(
requiredMode = Schema.RequiredMode.REQUIRED,
accessMode = Schema.AccessMode.READ_WRITE,
description = "Configuration of the AI chat model that should execute the request"
)
@NotNull @Valid
AiChatModel<?> chatModel
) {
public ChatRequest toLangChainChatRequest() {
return ChatRequest.builder()
.messages(getLangChainMessages())
.build();
}
private List<ChatMessage> getLangChainMessages() {
List<ChatMessage> messages = new ArrayList<>(2);
if (systemMessage != null) {
messages.add(SystemMessage.from(systemMessage));
}
List<Content> langChainContents = userMessage.contents().stream()
.map(TbContent::toLangChainContent)
.toList();
messages.add(UserMessage.from(langChainContents));
return messages;
}
}

View File

@ -0,0 +1,68 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.ai.dto;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.v3.oas.annotations.media.Schema;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "status",
include = JsonTypeInfo.As.PROPERTY,
visible = true
)
@JsonSubTypes({
@JsonSubTypes.Type(value = TbChatResponse.Success.class, name = "SUCCESS"),
@JsonSubTypes.Type(value = TbChatResponse.Failure.class, name = "FAILURE")
})
public sealed interface TbChatResponse permits TbChatResponse.Success, TbChatResponse.Failure {
@Schema(
description = "Indicates whether the request was successful or not",
example = "SUCCESS"
)
String getStatus();
record Success(
@Schema(description = "The text content generated by the model")
String generatedContent
) implements TbChatResponse {
@Override
@Schema(example = "SUCCESS")
public String getStatus() {
return "SUCCESS";
}
}
record Failure(
@Schema(
description = "A string containing details about the failure"
)
String errorDetails
) implements TbChatResponse {
@Override
@Schema(example = "FAILURE")
public String getStatus() {
return "FAILURE";
}
}
}

View File

@ -0,0 +1,73 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.ai.dto;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import dev.langchain4j.data.message.Content;
import dev.langchain4j.data.message.TextContent;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import static org.thingsboard.server.common.data.ai.dto.TbContent.TbTextContent;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "contentType",
visible = true
)
@JsonSubTypes({
@JsonSubTypes.Type(value = TbTextContent.class, name = "TEXT")
})
public sealed interface TbContent permits TbTextContent {
TbContentType contentType();
Content toLangChainContent();
enum TbContentType {
TEXT
}
@Schema(
description = "Text-based content part of a user's prompt"
)
record TbTextContent(
@NotBlank
@Schema(
requiredMode = Schema.RequiredMode.REQUIRED,
description = "The text content",
example = "What is the weather like in Kyiv today?"
)
String text
) implements TbContent {
@Override
public TbContentType contentType() {
return TbContentType.TEXT;
}
@Override
public Content toLangChainContent() {
return TextContent.from(text);
}
}
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.ai.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import java.util.List;
public record TbUserMessage(
@NotEmpty
@Valid
@Schema(
requiredMode = Schema.RequiredMode.REQUIRED,
description = "A list of content parts that make up the complete user prompt"
)
List<TbContent> contents
) {}

View File

@ -15,12 +15,14 @@
*/
package org.thingsboard.rule.engine.api;
import dev.langchain4j.model.chat.ChatModel;
import com.google.common.util.concurrent.FluentFuture;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.response.ChatResponse;
import org.thingsboard.server.common.data.ai.model.chat.AiChatModel;
import org.thingsboard.server.common.data.ai.model.chat.AiChatModelConfig;
public interface RuleEngineAiModelService {
<C extends AiChatModelConfig<C>> ChatModel configureChatModel(AiChatModel<C> chatModel);
<C extends AiChatModelConfig<C>> FluentFuture<ChatResponse> sendChatRequestAsync(AiChatModel<C> chatModel, ChatRequest chatRequest);
}

View File

@ -427,8 +427,6 @@ public interface TbContext {
AiModelSettingsService getAiModelSettingsService();
AiRequestsExecutor getAiRequestsExecutor();
// Configuration parameters for the MQTT client that is used in the MQTT node and Azure IoT hub node
MqttClientSettings getMqttClientSettings();

View File

@ -21,7 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.SystemMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.chat.request.ChatRequest;
import dev.langchain4j.model.chat.request.ResponseFormat;
import dev.langchain4j.model.chat.request.ResponseFormatType;
@ -119,29 +118,27 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode {
.responseFormat(responseFormat)
.build();
configureChatModelAsync(ctx)
.transformAsync(chatModel -> ctx.getAiRequestsExecutor().sendChatRequestAsync(chatModel, chatRequest), directExecutor())
.addCallback(new FutureCallback<>() {
@Override
public void onSuccess(ChatResponse chatResponse) {
String response = chatResponse.aiMessage().text();
if (!isValidJsonObject(response)) {
response = wrapInJsonObject(response);
}
tellSuccess(ctx, ackedMsg.transform()
.data(response)
.build());
}
sendChatRequestAsync(ctx, chatRequest).addCallback(new FutureCallback<>() {
@Override
public void onSuccess(ChatResponse chatResponse) {
String response = chatResponse.aiMessage().text();
if (!isValidJsonObject(response)) {
response = wrapInJsonObject(response);
}
tellSuccess(ctx, ackedMsg.transform()
.data(response)
.build());
}
@Override
public void onFailure(@NonNull Throwable t) {
tellFailure(ctx, ackedMsg, t);
}
}, directExecutor());
@Override
public void onFailure(@NonNull Throwable t) {
tellFailure(ctx, ackedMsg, t);
}
}, directExecutor());
}
private <C extends AiChatModelConfig<C>> FluentFuture<ChatModel> configureChatModelAsync(TbContext ctx) {
return ctx.getAiModelSettingsService().findAiModelSettingsByTenantIdAndIdAsync(ctx.getTenantId(), modelSettingsId).transform(settingsOpt -> {
private <C extends AiChatModelConfig<C>> FluentFuture<ChatResponse> sendChatRequestAsync(TbContext ctx, ChatRequest chatRequest) {
return ctx.getAiModelSettingsService().findAiModelSettingsByTenantIdAndIdAsync(ctx.getTenantId(), modelSettingsId).transformAsync(settingsOpt -> {
if (settingsOpt.isEmpty()) {
throw new NoSuchElementException("[" + ctx.getTenantId() + "] AI model settings with ID: [" + modelSettingsId + "] were not found");
}
@ -158,7 +155,7 @@ public final class TbAiNode extends TbAbstractExternalNode implements TbNode {
.withTimeoutSeconds(timeoutSeconds)
.withMaxRetries(0)); // disable retries to respect timeout set in rule node config
return ctx.getAiModelService().configureChatModel(chatModel);
return ctx.getAiModelService().sendChatRequestAsync(chatModel, chatRequest);
}, ctx.getDbCallbackExecutor());
}