diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index 5c5eea32fd..c6bfef4971 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -24,6 +24,7 @@ import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; +import org.thingsboard.server.dao.ai.AiModelService; import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; @@ -59,6 +60,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.ai.AiModelProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; @@ -261,6 +263,11 @@ public class EdgeContextComponent { @Autowired private CalculatedFieldProcessor calculatedFieldProcessor; + @Autowired + private AiModelService aiModelService; + @Autowired + private AiModelProcessor aiModelProcessor; + public EdgeProcessor getProcessor(EdgeEventType edgeEventType) { EdgeProcessor processor = processorMap.get(edgeEventType); if (processor == null) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index fda9a1d17b..825b911403 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -113,7 +113,7 @@ public class EdgeEventSourcingListener { return; } try { - if (EntityType.TENANT == entityType || EntityType.EDGE == entityType || EntityType.AI_MODEL == entityType) { + if (EntityType.TENANT == entityType || EntityType.EDGE == entityType) { return; } log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event); @@ -227,7 +227,7 @@ public class EdgeEventSourcingListener { break; case TENANT: return !event.getCreated(); - case API_USAGE_STATE, EDGE, AI_MODEL: + case API_USAGE_STATE, EDGE: return false; case DOMAIN: if (entity instanceof Domain domain) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java index 192c56692d..505b03a1cd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java @@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.ai.AiModel; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.asset.Asset; @@ -52,6 +53,7 @@ import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.domain.DomainInfo; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.AiModelId; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -86,6 +88,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.common.data.widget.WidgetTypeDetails; import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; @@ -654,4 +657,17 @@ public class EdgeMsgConstructorUtils { .setIdLSB(calculatedFieldId.getId().getLeastSignificantBits()).build(); } + public static AiModelUpdateMsg constructAiModelUpdatedMsg(UpdateMsgType msgType, AiModel aiModel) { + return AiModelUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(aiModel)) + .setIdMSB(aiModel.getId().getId().getMostSignificantBits()) + .setIdLSB(aiModel.getId().getId().getLeastSignificantBits()).build(); + } + + public static AiModelUpdateMsg constructAiModelDeleteMsg(AiModelId aiModelId) { + return AiModelUpdateMsg.newBuilder() + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE) + .setIdMSB(aiModelId.getId().getMostSignificantBits()) + .setIdLSB(aiModelId.getId().getLeastSignificantBits()).build(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index a86409c6af..e4297e9bfc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -46,6 +46,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.dao.edge.stats.EdgeStatsKey; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; @@ -934,6 +935,11 @@ public abstract class EdgeGrpcSession implements Closeable { result.add(ctx.getCalculatedFieldProcessor().processCalculatedFieldMsgFromEdge(edge.getTenantId(), edge, calculatedFieldUpdateMsg)); } } + if (uplinkMsg.getAiModelUpdateMsgCount() > 0) { + for (AiModelUpdateMsg aiModelUpdateMsg : uplinkMsg.getAiModelUpdateMsgList()) { + result.add(ctx.getAiModelProcessor().processAiModelMsgFromEdge(edge.getTenantId(), edge, aiModelUpdateMsg)); + } + } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index a2243a88d2..d6bfeae0b7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -139,7 +139,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { UPDATED_COMMENT, DELETED -> true; default -> switch (type) { case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, - WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, CALCULATED_FIELD, NOTIFICATION_TEMPLATE, + WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, CALCULATED_FIELD, AI_MODEL, NOTIFICATION_TEMPLATE, NOTIFICATION_TARGET, NOTIFICATION_RULE -> true; default -> false; }; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java new file mode 100644 index 0000000000..74ca7ac27a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java @@ -0,0 +1,130 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.ai; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.util.Pair; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.ai.AiModel; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.AiModelId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils; + +import java.util.Optional; +import java.util.UUID; + +@Slf4j +@Component +@TbCoreComponent +public class AiModelEdgeProcessor extends BaseAiModelProcessor implements AiModelProcessor { + + @Override + public ListenableFuture processAiModelMsgFromEdge(TenantId tenantId, Edge edge, AiModelUpdateMsg aiModelUpdateMsg) { + AiModelId aiModelId = new AiModelId(new UUID(aiModelUpdateMsg.getIdMSB(), aiModelUpdateMsg.getIdLSB())); + try { + edgeSynchronizationManager.getEdgeId().set(edge.getId()); + + switch (aiModelUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + processAiModel(tenantId, aiModelId, aiModelUpdateMsg, edge); + return Futures.immediateFuture(null); + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(aiModelUpdateMsg.getMsgType()); + } + } catch (DataValidationException e) { + return Futures.immediateFailedFuture(e); + } finally { + edgeSynchronizationManager.getEdgeId().remove(); + } + } + + @Override + public DownlinkMsg convertEdgeEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { + AiModelId aiModelId = new AiModelId(edgeEvent.getEntityId()); + switch (edgeEvent.getAction()) { + case ADDED, UPDATED -> { + Optional aiModel = edgeCtx.getAiModelService().findAiModelById(edgeEvent.getTenantId(), aiModelId); + if (aiModel.isPresent()) { + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + AiModelUpdateMsg aiModelUpdateMsg = EdgeMsgConstructorUtils.constructAiModelUpdatedMsg(msgType, aiModel.get()); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAiModelUpdateMsg(aiModelUpdateMsg) + .build(); + } + } + case DELETED -> { + AiModelUpdateMsg aiModelUpdateMsg = EdgeMsgConstructorUtils.constructAiModelDeleteMsg(aiModelId); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAiModelUpdateMsg(aiModelUpdateMsg) + .build(); + } + } + return null; + } + + @Override + public EdgeEventType getEdgeEventType() { + return EdgeEventType.AI_MODEL; + } + + private void processAiModel(TenantId tenantId, AiModelId aiModelId, AiModelUpdateMsg aiModelUpdateMsg, Edge edge) { + Pair resultPair = super.saveOrUpdateAiModel(tenantId, aiModelId, aiModelUpdateMsg); + Boolean wasCreated = resultPair.getFirst(); + if (wasCreated) { + pushAiModelCreatedEventToRuleEngine(tenantId, edge, aiModelId); + } + Boolean nameWasUpdated = resultPair.getSecond(); + if (nameWasUpdated) { + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.AI_MODEL, EdgeEventActionType.UPDATED, aiModelId, null); + } + } + + private void pushAiModelCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AiModelId aiModelId) { + try { + Optional aiModel = edgeCtx.getAiModelService().findAiModelById(tenantId, aiModelId); + if (aiModel.isPresent()) { + String aiModelAsString = JacksonUtil.toString(aiModel.get()); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, edge.getCustomerId()); + pushEntityEventToRuleEngine(tenantId, aiModelId, edge.getCustomerId(), TbMsgType.ENTITY_CREATED, aiModelAsString, msgMetaData); + } else { + log.warn("[{}][{}] Failed to find aiModel", tenantId, aiModelId); + } + } catch (Exception e) { + log.warn("[{}][{}] Failed to push aiModel action to rule engine: {}", tenantId, aiModelId, TbMsgType.ENTITY_CREATED.name(), e); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelProcessor.java new file mode 100644 index 0000000000..f66421167a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelProcessor.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.ai; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; +import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; + +public interface AiModelProcessor extends EdgeProcessor { + + ListenableFuture processAiModelMsgFromEdge(TenantId tenantId, Edge edge, AiModelUpdateMsg aiModelUpdateMsg); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/BaseAiModelProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/BaseAiModelProcessor.java new file mode 100644 index 0000000000..cb1d27e0a1 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/BaseAiModelProcessor.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.ai; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.ai.AiModel; +import org.thingsboard.server.common.data.id.AiModelId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.service.DataValidator; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +import java.util.Optional; + +@Slf4j +public abstract class BaseAiModelProcessor extends BaseEdgeProcessor { + + @Autowired + private DataValidator aiModelValidator; + + protected Pair saveOrUpdateAiModel(TenantId tenantId, AiModelId aiModelId, AiModelUpdateMsg aiModelUpdateMsg) { + boolean isCreated = false; + boolean isNameUpdated = false; + try { + AiModel aiModel = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true); + if (aiModel == null) { + throw new RuntimeException("[{" + tenantId + "}] aiModelUpdateMsg {" + aiModelUpdateMsg + " } cannot be converted to aiModel"); + } + + Optional aiModelById = edgeCtx.getAiModelService().findAiModelById(tenantId, aiModelId); + if (aiModelById.isEmpty()) { + aiModel.setCreatedTime(Uuids.unixTimestamp(aiModelId.getId())); + isCreated = true; + aiModel.setId(null); + } else { + aiModel.setId(aiModelId); + } + + String aiModelName = aiModel.getName(); + Optional aiModelByName = edgeCtx.getAiModelService().findAiModelByTenantIdAndName(aiModel.getTenantId(), aiModelName); + if (aiModelByName.isPresent() && !aiModelByName.get().getId().equals(aiModelId)) { + aiModelName = aiModelName + "_" + StringUtils.randomAlphabetic(15); + log.warn("[{}] aiModel with name {} already exists. Renaming aiModel name to {}", + tenantId, aiModel.getName(), aiModelByName.get().getName()); + isNameUpdated = true; + } + aiModel.setName(aiModelName); + + aiModelValidator.validate(aiModel, AiModel::getTenantId); + + if (isCreated) { + aiModel.setId(aiModelId); + } + + edgeCtx.getAiModelService().save(aiModel, false); + } catch (Exception e) { + log.error("[{}] Failed to process aiModel update msg [{}]", tenantId, aiModelUpdateMsg, e); + throw e; + } + return Pair.of(isCreated, isNameUpdated); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/edge/AiModelEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AiModelEdgeTest.java new file mode 100644 index 0000000000..66a681e7b7 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/edge/AiModelEdgeTest.java @@ -0,0 +1,197 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.edge; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import org.junit.Assert; +import org.junit.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.ai.AiModel; +import org.thingsboard.server.common.data.ai.model.chat.OpenAiChatModelConfig; +import org.thingsboard.server.common.data.ai.provider.OpenAiProviderConfig; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.gen.edge.v1.UplinkMsg; +import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg; + +import java.util.Optional; +import java.util.UUID; + +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +public class AiModelEdgeTest extends AbstractEdgeTest { + + private static final String DEFAULT_AI_MODEL_NAME = "Edge Test AiModel"; + private static final String UPDATED_AI_MODEL_NAME = "Updated Edge Test AiModel"; + + @Test + public void testAiModel_create_update_delete() throws Exception { + // create AiModel + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + + edgeImitator.expectMessageAmount(1); + AiModel savedAiModel = doPost("/api/ai/model", aiModel, AiModel.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg); + AiModelUpdateMsg aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType()); + Assert.assertEquals(savedAiModel.getUuidId().getMostSignificantBits(), aiModelUpdateMsg.getIdMSB()); + Assert.assertEquals(savedAiModel.getUuidId().getLeastSignificantBits(), aiModelUpdateMsg.getIdLSB()); + AiModel aiModelFromMsg = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true); + Assert.assertNotNull(aiModelFromMsg); + + Assert.assertEquals(DEFAULT_AI_MODEL_NAME, aiModelFromMsg.getName()); + Assert.assertEquals(savedAiModel.getTenantId(), aiModelFromMsg.getTenantId()); + + // update AiModel + edgeImitator.expectMessageAmount(1); + savedAiModel.setName(UPDATED_AI_MODEL_NAME); + savedAiModel = doPost("/api/ai/model", savedAiModel, AiModel.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg); + aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage; + aiModelFromMsg = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true); + Assert.assertNotNull(aiModelFromMsg); + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType()); + Assert.assertEquals(UPDATED_AI_MODEL_NAME, aiModelFromMsg.getName()); + + // delete AiModel + edgeImitator.expectMessageAmount(1); + doDelete("/api/ai/model/" + savedAiModel.getUuidId()) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg); + aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType()); + Assert.assertEquals(savedAiModel.getUuidId().getMostSignificantBits(), aiModelUpdateMsg.getIdMSB()); + Assert.assertEquals(savedAiModel.getUuidId().getLeastSignificantBits(), aiModelUpdateMsg.getIdLSB()); + } + + @Test + public void testSendAiModelToCloud() throws Exception { + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + UUID uuid = Uuids.timeBased(); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + + checkAiModelOnCloud(uplinkMsg, uuid, aiModel.getName()); + } + + @Test + public void testUpdateAiModelNameOnCloud() throws Exception { + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + UUID uuid = Uuids.timeBased(); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + + checkAiModelOnCloud(uplinkMsg, uuid, aiModel.getName()); + + aiModel.setName(UPDATED_AI_MODEL_NAME); + UplinkMsg updatedUplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE); + + checkAiModelOnCloud(updatedUplinkMsg, uuid, aiModel.getName()); + } + + @Test + public void testAiModelToCloudWithNameThatAlreadyExistsOnCloud() throws Exception { + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + + edgeImitator.expectMessageAmount(1); + AiModel savedAiModel = doPost("/api/ai/model", aiModel, AiModel.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + + UUID uuid = Uuids.timeBased(); + + UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.expectMessageAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsg); + + Assert.assertTrue(edgeImitator.waitForResponses()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + Optional aiModelUpdateMsgOpt = edgeImitator.findMessageByType(AiModelUpdateMsg.class); + Assert.assertTrue(aiModelUpdateMsgOpt.isPresent()); + AiModelUpdateMsg latestAiModelUpdateMsg = aiModelUpdateMsgOpt.get(); + AiModel aiModelFromMsg = JacksonUtil.fromString(latestAiModelUpdateMsg.getEntity(), AiModel.class, true); + Assert.assertNotNull(aiModelFromMsg); + Assert.assertNotEquals(DEFAULT_AI_MODEL_NAME, aiModelFromMsg.getName()); + + Assert.assertNotEquals(savedAiModel.getUuidId(), uuid); + + AiModel aiModelFromCloud = doGet("/api/ai/model/" + uuid, AiModel.class); + Assert.assertNotNull(aiModelFromCloud); + Assert.assertNotEquals(DEFAULT_AI_MODEL_NAME, aiModelFromCloud.getName()); + } + + private AiModel createSimpleAiModel(String name) { + AiModel aiModel = new AiModel(); + aiModel.setTenantId(tenantId); + aiModel.setName(name); + aiModel.setConfiguration(OpenAiChatModelConfig.builder() + .providerConfig(new OpenAiProviderConfig("test-api-key")) + .modelId("gpt-4o") + .temperature(0.5) + .topP(0.3) + .frequencyPenalty(0.1) + .presencePenalty(0.2) + .maxOutputTokens(1000) + .timeoutSeconds(60) + .maxRetries(2) + .build()); + return aiModel; + } + + private UplinkMsg getUplinkMsg(UUID uuid, AiModel aiModel, UpdateMsgType updateMsgType) throws InvalidProtocolBufferException { + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AiModelUpdateMsg.Builder aiModelUpdateMsgBuilder = AiModelUpdateMsg.newBuilder(); + aiModelUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits()); + aiModelUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits()); + aiModelUpdateMsgBuilder.setEntity(JacksonUtil.toString(aiModel)); + aiModelUpdateMsgBuilder.setMsgType(updateMsgType); + testAutoGeneratedCodeByProtobuf(aiModelUpdateMsgBuilder); + uplinkMsgBuilder.addAiModelUpdateMsg(aiModelUpdateMsgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + return uplinkMsgBuilder.build(); + } + + private void checkAiModelOnCloud(UplinkMsg uplinkMsg, UUID uuid, String resourceTitle) throws Exception { + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsg); + + Assert.assertTrue(edgeImitator.waitForResponses()); + + UplinkResponseMsg latestResponseMsg = edgeImitator.getLatestResponseMsg(); + Assert.assertTrue(latestResponseMsg.getSuccess()); + + AiModel aiModel = doGet("/api/ai/model/" + uuid, AiModel.class); + Assert.assertNotNull(aiModel); + Assert.assertEquals(resourceTitle, aiModel.getName()); + } + +} diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index 16d10d9b6e..4594435626 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -29,6 +29,7 @@ import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; import org.thingsboard.server.controller.AbstractWebTest; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; @@ -358,6 +359,11 @@ public class EdgeImitator { result.add(saveDownlinkMsg(calculatedFieldUpdateMsg)); } } + if (downlinkMsg.getAiModelUpdateMsgCount() > 0) { + for (AiModelUpdateMsg aiModelUpdateMsg : downlinkMsg.getAiModelUpdateMsgList()) { + result.add(saveDownlinkMsg(aiModelUpdateMsg)); + } + } if (downlinkMsg.hasEdgeConfiguration()) { result.add(saveDownlinkMsg(downlinkMsg.getEdgeConfiguration())); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/ai/AiModelService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/ai/AiModelService.java index 3ad12048cf..55abc80998 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/ai/AiModelService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/ai/AiModelService.java @@ -29,6 +29,8 @@ public interface AiModelService extends EntityDaoService { AiModel save(AiModel model); + AiModel save(AiModel model, boolean doValidate); + Optional findAiModelById(TenantId tenantId, AiModelId modelId); PageData findAiModelsByTenantId(TenantId tenantId, PageLink pageLink); @@ -37,6 +39,8 @@ public interface AiModelService extends EntityDaoService { FluentFuture> findAiModelByTenantIdAndIdAsync(TenantId tenantId, AiModelId modelId); + Optional findAiModelByTenantIdAndName(TenantId tenantId, String name); + boolean deleteByTenantIdAndId(TenantId tenantId, AiModelId modelId); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index 0d5c3f34ad..fadb44207a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -47,7 +47,8 @@ public enum EdgeEventType { TB_RESOURCE(true, EntityType.TB_RESOURCE), OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT), DOMAIN(true, EntityType.DOMAIN), - CALCULATED_FIELD(false, EntityType.CALCULATED_FIELD); + CALCULATED_FIELD(false, EntityType.CALCULATED_FIELD), + AI_MODEL(true, EntityType.AI_MODEL); private final boolean allEdgesRelated; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java index fc1c1cd1a9..09d34ff10b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java @@ -113,6 +113,7 @@ public class EntityIdFactory { case OAUTH2_CLIENT -> new OAuth2ClientId(uuid); case DOMAIN -> new DomainId(uuid); case CALCULATED_FIELD -> new CalculatedFieldId(uuid); + case AI_MODEL -> new AiModelId(uuid); default -> throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!"); }; } diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index 0eff7ad053..dbaef23431 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -136,7 +136,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { .setConnectRequestMsg(ConnectRequestMsg.newBuilder() .setEdgeRoutingKey(edgeKey) .setEdgeSecret(edgeSecret) - .setEdgeVersion(EdgeVersion.V_4_2_0) + .setEdgeVersion(EdgeVersion.V_4_3_0) .setMaxInboundMessageSize(maxInboundMessageSize) .build()) .build()); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index dbda462a99..5a0aeac977 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -44,6 +44,7 @@ enum EdgeVersion { V_4_0_0 = 10; V_4_1_0 = 11; V_4_2_0 = 12; + V_4_3_0 = 13; V_LATEST = 999; } @@ -133,6 +134,12 @@ message CalculatedFieldUpdateMsg{ string entity = 4; } +message AiModelUpdateMsg{ + UpdateMsgType msgType = 1; + int64 idMSB = 2; + int64 idLSB = 3; + string entity = 4; +} message EntityDataProto { int64 entityIdMSB = 1; @@ -441,6 +448,7 @@ message UplinkMsg { repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 24; repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 25; repeated CalculatedFieldRequestMsg calculatedFieldRequestMsg = 26; + repeated AiModelUpdateMsg aiModelUpdateMsg = 27; } message UplinkResponseMsg { @@ -491,4 +499,5 @@ message DownlinkMsg { repeated NotificationTemplateUpdateMsg notificationTemplateUpdateMsg = 33; repeated OAuth2DomainUpdateMsg oAuth2DomainUpdateMsg = 34; repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 35; + repeated AiModelUpdateMsg aiModelUpdateMsg = 36; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java index b091a29247..7cdf5b027d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java @@ -29,12 +29,15 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.CachedVersionedEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.model.sql.AiModelEntity; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.sql.JpaExecutorService; import java.util.Optional; import java.util.Set; +import java.util.UUID; import static org.thingsboard.server.dao.service.Validator.validatePageLink; @@ -63,11 +66,23 @@ class AiModelServiceImpl extends CachedVersionedEntityService findAiModelByTenantIdAndId(tenantId, modelId))); } + @Override + public Optional findAiModelByTenantIdAndName(TenantId tenantId, String name) { + return Optional.ofNullable(aiModelDao.findByTenantIdAndName(tenantId.getId(), name)); + } + @Override @Transactional public boolean deleteByTenantIdAndId(TenantId tenantId, AiModelId modelId) { - return deleteByTenantIdAndIdInternal(tenantId, modelId); + return deleteByTenantIdAndIdInternal(tenantId, modelId.getId()); } @Override @@ -123,14 +143,21 @@ class AiModelServiceImpl extends CachedVersionedEntityService>() { @@ -3094,7 +3096,7 @@ public class RestClient implements Closeable { addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params); return restTemplate.exchange( baseURL + "/api/widgetTypesInfos?widgetsBundleId={widgetsBundleId}&" + getUrlParams(pageLink) + - getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList), + getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference>() { @@ -4144,6 +4146,29 @@ public class RestClient implements Closeable { } } + public AiModel saveAiModel(AiModel aiModel) { + return restTemplate.postForEntity(baseURL + "/api/ai/model", aiModel, AiModel.class).getBody(); + } + + public Optional getAiModel(AiModelId aiModelId) { + try { + ResponseEntity response = restTemplate.getForEntity( + baseURL + "/api/aiModel/{aiModelId}", AiModel.class, aiModelId.getId()); + return Optional.ofNullable(response.getBody()); + } catch (HttpClientErrorException exception) { + if (exception.getStatusCode() == HttpStatus.NOT_FOUND) { + return Optional.empty(); + } else { + throw exception; + } + } + } + + public void deleteAiModel(AiModelId aiModelId) { + restTemplate.delete(baseURL + "/api/aiModel/{aiModelId}", aiModelId.getId()); + } + + private String getTimeUrlParams(TimePageLink pageLink) { String urlParams = getUrlParams(pageLink); if (pageLink.getStartTime() != null) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java index 654ed3f448..a9ebb7ecb7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/ai/TbAiNode.java @@ -92,7 +92,6 @@ import static org.thingsboard.server.dao.service.ConstraintValidator.validateFie configClazz = TbAiNodeConfiguration.class, configDirective = "tbExternalNodeAiConfig", iconUrl = "", - ruleChainTypes = RuleChainType.CORE, docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/ai-request/" ) public final class TbAiNode extends TbAbstractExternalNode implements TbNode {