diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index fda9a1d17b..825b911403 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -113,7 +113,7 @@ public class EdgeEventSourcingListener { return; } try { - if (EntityType.TENANT == entityType || EntityType.EDGE == entityType || EntityType.AI_MODEL == entityType) { + if (EntityType.TENANT == entityType || EntityType.EDGE == entityType) { return; } log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event); @@ -227,7 +227,7 @@ public class EdgeEventSourcingListener { break; case TENANT: return !event.getCreated(); - case API_USAGE_STATE, EDGE, AI_MODEL: + case API_USAGE_STATE, EDGE: return false; case DOMAIN: if (entity instanceof Domain domain) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java index aaa1f05aa7..cd932ece5d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ai/AiModelEdgeProcessor.java @@ -58,12 +58,6 @@ public class AiModelEdgeProcessor extends BaseAiModelProcessor implements AiMode case ENTITY_UPDATED_RPC_MESSAGE: processAiModel(tenantId, aiModelId, aiModelUpdateMsg, edge); return Futures.immediateFuture(null); - case ENTITY_DELETED_RPC_MESSAGE: - Optional aiModel = edgeCtx.getAiModelService().findAiModelById(tenantId, aiModelId); - if (aiModel.isPresent()) { - edgeCtx.getAiModelService().deleteByTenantIdAndId(tenantId, aiModelId); - } - return Futures.immediateFuture(null); case UNRECOGNIZED: default: return handleUnsupportedMsgType(aiModelUpdateMsg.getMsgType()); @@ -111,33 +105,6 @@ public class AiModelEdgeProcessor extends BaseAiModelProcessor implements AiMode return EdgeEventType.AI_MODEL; } -// @Override -// public ListenableFuture processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { -// EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); -// EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); -// EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(type, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); -// EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); -// -// switch (actionType) { -// case UPDATED: -// case ADDED: -// EntityId calculatedFieldOwnerId = JacksonUtil.fromString(edgeNotificationMsg.getBody(), EntityId.class); -// if (calculatedFieldOwnerId != null && -// (EntityType.DEVICE.equals(calculatedFieldOwnerId.getEntityType()) || EntityType.ASSET.equals(calculatedFieldOwnerId.getEntityType()))) { -// JsonNode body = JacksonUtil.toJsonNode(edgeNotificationMsg.getBody()); -// EdgeId edgeId = safeGetEdgeId(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()); -// -// return edgeId != null ? -// saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body) : -// processNotificationToRelatedEdges(tenantId, calculatedFieldOwnerId, entityId, type, actionType, originatorEdgeId); -// } else { -// return processActionForAllEdges(tenantId, type, actionType, entityId, null, originatorEdgeId); -// } -// default: -// return super.processEntityNotification(tenantId, edgeNotificationMsg); -// } -// } - private void processAiModel(TenantId tenantId, AiModelId aiModelId, AiModelUpdateMsg aiModelUpdateMsg, Edge edge) { Pair resultPair = super.saveOrUpdateAiModel(tenantId, aiModelId, aiModelUpdateMsg); Boolean wasCreated = resultPair.getFirst(); @@ -158,7 +125,7 @@ public class AiModelEdgeProcessor extends BaseAiModelProcessor implements AiMode TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, edge.getCustomerId()); pushEntityEventToRuleEngine(tenantId, aiModelId, edge.getCustomerId(), TbMsgType.ENTITY_CREATED, aiModelAsString, msgMetaData); } else { - log.warn("[{}][{}] Failed to find AiModel", tenantId, aiModelId); + log.warn("[{}][{}] Failed to find aiModel", tenantId, aiModelId); } } catch (Exception e) { log.warn("[{}][{}] Failed to push aiModel action to rule engine: {}", tenantId, aiModelId, TbMsgType.ENTITY_CREATED.name(), e); diff --git a/application/src/test/java/org/thingsboard/server/edge/AiModelEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AiModelEdgeTest.java new file mode 100644 index 0000000000..66a681e7b7 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/edge/AiModelEdgeTest.java @@ -0,0 +1,197 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.edge; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.InvalidProtocolBufferException; +import org.junit.Assert; +import org.junit.Test; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.ai.AiModel; +import org.thingsboard.server.common.data.ai.model.chat.OpenAiChatModelConfig; +import org.thingsboard.server.common.data.ai.provider.OpenAiProviderConfig; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.gen.edge.v1.UplinkMsg; +import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg; + +import java.util.Optional; +import java.util.UUID; + +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +@DaoSqlTest +public class AiModelEdgeTest extends AbstractEdgeTest { + + private static final String DEFAULT_AI_MODEL_NAME = "Edge Test AiModel"; + private static final String UPDATED_AI_MODEL_NAME = "Updated Edge Test AiModel"; + + @Test + public void testAiModel_create_update_delete() throws Exception { + // create AiModel + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + + edgeImitator.expectMessageAmount(1); + AiModel savedAiModel = doPost("/api/ai/model", aiModel, AiModel.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg); + AiModelUpdateMsg aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType()); + Assert.assertEquals(savedAiModel.getUuidId().getMostSignificantBits(), aiModelUpdateMsg.getIdMSB()); + Assert.assertEquals(savedAiModel.getUuidId().getLeastSignificantBits(), aiModelUpdateMsg.getIdLSB()); + AiModel aiModelFromMsg = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true); + Assert.assertNotNull(aiModelFromMsg); + + Assert.assertEquals(DEFAULT_AI_MODEL_NAME, aiModelFromMsg.getName()); + Assert.assertEquals(savedAiModel.getTenantId(), aiModelFromMsg.getTenantId()); + + // update AiModel + edgeImitator.expectMessageAmount(1); + savedAiModel.setName(UPDATED_AI_MODEL_NAME); + savedAiModel = doPost("/api/ai/model", savedAiModel, AiModel.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg); + aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage; + aiModelFromMsg = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true); + Assert.assertNotNull(aiModelFromMsg); + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType()); + Assert.assertEquals(UPDATED_AI_MODEL_NAME, aiModelFromMsg.getName()); + + // delete AiModel + edgeImitator.expectMessageAmount(1); + doDelete("/api/ai/model/" + savedAiModel.getUuidId()) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg); + aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType()); + Assert.assertEquals(savedAiModel.getUuidId().getMostSignificantBits(), aiModelUpdateMsg.getIdMSB()); + Assert.assertEquals(savedAiModel.getUuidId().getLeastSignificantBits(), aiModelUpdateMsg.getIdLSB()); + } + + @Test + public void testSendAiModelToCloud() throws Exception { + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + UUID uuid = Uuids.timeBased(); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + + checkAiModelOnCloud(uplinkMsg, uuid, aiModel.getName()); + } + + @Test + public void testUpdateAiModelNameOnCloud() throws Exception { + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + UUID uuid = Uuids.timeBased(); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + + checkAiModelOnCloud(uplinkMsg, uuid, aiModel.getName()); + + aiModel.setName(UPDATED_AI_MODEL_NAME); + UplinkMsg updatedUplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE); + + checkAiModelOnCloud(updatedUplinkMsg, uuid, aiModel.getName()); + } + + @Test + public void testAiModelToCloudWithNameThatAlreadyExistsOnCloud() throws Exception { + AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME); + + edgeImitator.expectMessageAmount(1); + AiModel savedAiModel = doPost("/api/ai/model", aiModel, AiModel.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + + UUID uuid = Uuids.timeBased(); + + UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.expectMessageAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsg); + + Assert.assertTrue(edgeImitator.waitForResponses()); + Assert.assertTrue(edgeImitator.waitForMessages()); + + Optional aiModelUpdateMsgOpt = edgeImitator.findMessageByType(AiModelUpdateMsg.class); + Assert.assertTrue(aiModelUpdateMsgOpt.isPresent()); + AiModelUpdateMsg latestAiModelUpdateMsg = aiModelUpdateMsgOpt.get(); + AiModel aiModelFromMsg = JacksonUtil.fromString(latestAiModelUpdateMsg.getEntity(), AiModel.class, true); + Assert.assertNotNull(aiModelFromMsg); + Assert.assertNotEquals(DEFAULT_AI_MODEL_NAME, aiModelFromMsg.getName()); + + Assert.assertNotEquals(savedAiModel.getUuidId(), uuid); + + AiModel aiModelFromCloud = doGet("/api/ai/model/" + uuid, AiModel.class); + Assert.assertNotNull(aiModelFromCloud); + Assert.assertNotEquals(DEFAULT_AI_MODEL_NAME, aiModelFromCloud.getName()); + } + + private AiModel createSimpleAiModel(String name) { + AiModel aiModel = new AiModel(); + aiModel.setTenantId(tenantId); + aiModel.setName(name); + aiModel.setConfiguration(OpenAiChatModelConfig.builder() + .providerConfig(new OpenAiProviderConfig("test-api-key")) + .modelId("gpt-4o") + .temperature(0.5) + .topP(0.3) + .frequencyPenalty(0.1) + .presencePenalty(0.2) + .maxOutputTokens(1000) + .timeoutSeconds(60) + .maxRetries(2) + .build()); + return aiModel; + } + + private UplinkMsg getUplinkMsg(UUID uuid, AiModel aiModel, UpdateMsgType updateMsgType) throws InvalidProtocolBufferException { + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AiModelUpdateMsg.Builder aiModelUpdateMsgBuilder = AiModelUpdateMsg.newBuilder(); + aiModelUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits()); + aiModelUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits()); + aiModelUpdateMsgBuilder.setEntity(JacksonUtil.toString(aiModel)); + aiModelUpdateMsgBuilder.setMsgType(updateMsgType); + testAutoGeneratedCodeByProtobuf(aiModelUpdateMsgBuilder); + uplinkMsgBuilder.addAiModelUpdateMsg(aiModelUpdateMsgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + return uplinkMsgBuilder.build(); + } + + private void checkAiModelOnCloud(UplinkMsg uplinkMsg, UUID uuid, String resourceTitle) throws Exception { + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsg); + + Assert.assertTrue(edgeImitator.waitForResponses()); + + UplinkResponseMsg latestResponseMsg = edgeImitator.getLatestResponseMsg(); + Assert.assertTrue(latestResponseMsg.getSuccess()); + + AiModel aiModel = doGet("/api/ai/model/" + uuid, AiModel.class); + Assert.assertNotNull(aiModel); + Assert.assertEquals(resourceTitle, aiModel.getName()); + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java index fc1c1cd1a9..09d34ff10b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java @@ -113,6 +113,7 @@ public class EntityIdFactory { case OAUTH2_CLIENT -> new OAuth2ClientId(uuid); case DOMAIN -> new DomainId(uuid); case CALCULATED_FIELD -> new CalculatedFieldId(uuid); + case AI_MODEL -> new AiModelId(uuid); default -> throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!"); }; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java index 21720813c7..f898c7aa4f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/ai/AiModelServiceImpl.java @@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.CachedVersionedEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.model.sql.AiModelEntity; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.sql.JpaExecutorService; @@ -67,14 +69,19 @@ class AiModelServiceImpl extends CachedVersionedEntityService