Merge pull request #13998 from jekka001/support-ai-model-sync

[Edge] Added AI model sync
This commit is contained in:
Viacheslav Klimov 2025-10-14 10:52:34 +03:00 committed by GitHub
commit a538d90e0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 553 additions and 16 deletions

View File

@ -24,6 +24,7 @@ import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.dao.ai.AiModelService;
import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmCommentService;
import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetProfileService;
@ -59,6 +60,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings; import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.ai.AiModelProcessor;
import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor;
import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor;
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor;
@ -261,6 +263,11 @@ public class EdgeContextComponent {
@Autowired @Autowired
private CalculatedFieldProcessor calculatedFieldProcessor; private CalculatedFieldProcessor calculatedFieldProcessor;
@Autowired
private AiModelService aiModelService;
@Autowired
private AiModelProcessor aiModelProcessor;
public EdgeProcessor getProcessor(EdgeEventType edgeEventType) { public EdgeProcessor getProcessor(EdgeEventType edgeEventType) {
EdgeProcessor processor = processorMap.get(edgeEventType); EdgeProcessor processor = processorMap.get(edgeEventType);
if (processor == null) { if (processor == null) {

View File

@ -113,7 +113,7 @@ public class EdgeEventSourcingListener {
return; return;
} }
try { try {
if (EntityType.TENANT == entityType || EntityType.EDGE == entityType || EntityType.AI_MODEL == entityType) { if (EntityType.TENANT == entityType || EntityType.EDGE == entityType) {
return; return;
} }
log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event); log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event);
@ -227,7 +227,7 @@ public class EdgeEventSourcingListener {
break; break;
case TENANT: case TENANT:
return !event.getCreated(); return !event.getCreated();
case API_USAGE_STATE, EDGE, AI_MODEL: case API_USAGE_STATE, EDGE:
return false; return false;
case DOMAIN: case DOMAIN:
if (entity instanceof Domain domain) { if (entity instanceof Domain domain) {

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.ai.AiModel;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.Asset;
@ -52,6 +53,7 @@ import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.domain.DomainInfo; import org.thingsboard.server.common.data.domain.DomainInfo;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.AiModelId;
import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -86,6 +88,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.common.data.security.UserCredentials;
import org.thingsboard.server.common.data.widget.WidgetTypeDetails; import org.thingsboard.server.common.data.widget.WidgetTypeDetails;
import org.thingsboard.server.common.data.widget.WidgetsBundle; import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
@ -654,4 +657,17 @@ public class EdgeMsgConstructorUtils {
.setIdLSB(calculatedFieldId.getId().getLeastSignificantBits()).build(); .setIdLSB(calculatedFieldId.getId().getLeastSignificantBits()).build();
} }
public static AiModelUpdateMsg constructAiModelUpdatedMsg(UpdateMsgType msgType, AiModel aiModel) {
return AiModelUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(aiModel))
.setIdMSB(aiModel.getId().getId().getMostSignificantBits())
.setIdLSB(aiModel.getId().getId().getLeastSignificantBits()).build();
}
public static AiModelUpdateMsg constructAiModelDeleteMsg(AiModelId aiModelId) {
return AiModelUpdateMsg.newBuilder()
.setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
.setIdMSB(aiModelId.getId().getMostSignificantBits())
.setIdLSB(aiModelId.getId().getLeastSignificantBits()).build();
}
} }

View File

@ -46,6 +46,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.dao.edge.stats.EdgeStatsKey; import org.thingsboard.server.dao.edge.stats.EdgeStatsKey;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
@ -934,6 +935,11 @@ public abstract class EdgeGrpcSession implements Closeable {
result.add(ctx.getCalculatedFieldProcessor().processCalculatedFieldMsgFromEdge(edge.getTenantId(), edge, calculatedFieldUpdateMsg)); result.add(ctx.getCalculatedFieldProcessor().processCalculatedFieldMsgFromEdge(edge.getTenantId(), edge, calculatedFieldUpdateMsg));
} }
} }
if (uplinkMsg.getAiModelUpdateMsgCount() > 0) {
for (AiModelUpdateMsg aiModelUpdateMsg : uplinkMsg.getAiModelUpdateMsgList()) {
result.add(ctx.getAiModelProcessor().processAiModelMsgFromEdge(edge.getTenantId(), edge, aiModelUpdateMsg));
}
}
} catch (Exception e) { } catch (Exception e) {
String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg);
log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e); log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e);

View File

@ -139,7 +139,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
UPDATED_COMMENT, DELETED -> true; UPDATED_COMMENT, DELETED -> true;
default -> switch (type) { default -> switch (type) {
case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE,
WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, CALCULATED_FIELD, NOTIFICATION_TEMPLATE, WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, CALCULATED_FIELD, AI_MODEL, NOTIFICATION_TEMPLATE,
NOTIFICATION_TARGET, NOTIFICATION_RULE -> true; NOTIFICATION_TARGET, NOTIFICATION_RULE -> true;
default -> false; default -> false;
}; };

View File

@ -0,0 +1,130 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.ai;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.ai.AiModel;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.AiModelId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@Component
@TbCoreComponent
public class AiModelEdgeProcessor extends BaseAiModelProcessor implements AiModelProcessor {
@Override
public ListenableFuture<Void> processAiModelMsgFromEdge(TenantId tenantId, Edge edge, AiModelUpdateMsg aiModelUpdateMsg) {
AiModelId aiModelId = new AiModelId(new UUID(aiModelUpdateMsg.getIdMSB(), aiModelUpdateMsg.getIdLSB()));
try {
edgeSynchronizationManager.getEdgeId().set(edge.getId());
switch (aiModelUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
processAiModel(tenantId, aiModelId, aiModelUpdateMsg, edge);
return Futures.immediateFuture(null);
case UNRECOGNIZED:
default:
return handleUnsupportedMsgType(aiModelUpdateMsg.getMsgType());
}
} catch (DataValidationException e) {
return Futures.immediateFailedFuture(e);
} finally {
edgeSynchronizationManager.getEdgeId().remove();
}
}
@Override
public DownlinkMsg convertEdgeEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) {
AiModelId aiModelId = new AiModelId(edgeEvent.getEntityId());
switch (edgeEvent.getAction()) {
case ADDED, UPDATED -> {
Optional<AiModel> aiModel = edgeCtx.getAiModelService().findAiModelById(edgeEvent.getTenantId(), aiModelId);
if (aiModel.isPresent()) {
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
AiModelUpdateMsg aiModelUpdateMsg = EdgeMsgConstructorUtils.constructAiModelUpdatedMsg(msgType, aiModel.get());
return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAiModelUpdateMsg(aiModelUpdateMsg)
.build();
}
}
case DELETED -> {
AiModelUpdateMsg aiModelUpdateMsg = EdgeMsgConstructorUtils.constructAiModelDeleteMsg(aiModelId);
return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addAiModelUpdateMsg(aiModelUpdateMsg)
.build();
}
}
return null;
}
@Override
public EdgeEventType getEdgeEventType() {
return EdgeEventType.AI_MODEL;
}
private void processAiModel(TenantId tenantId, AiModelId aiModelId, AiModelUpdateMsg aiModelUpdateMsg, Edge edge) {
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateAiModel(tenantId, aiModelId, aiModelUpdateMsg);
Boolean wasCreated = resultPair.getFirst();
if (wasCreated) {
pushAiModelCreatedEventToRuleEngine(tenantId, edge, aiModelId);
}
Boolean nameWasUpdated = resultPair.getSecond();
if (nameWasUpdated) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.AI_MODEL, EdgeEventActionType.UPDATED, aiModelId, null);
}
}
private void pushAiModelCreatedEventToRuleEngine(TenantId tenantId, Edge edge, AiModelId aiModelId) {
try {
Optional<AiModel> aiModel = edgeCtx.getAiModelService().findAiModelById(tenantId, aiModelId);
if (aiModel.isPresent()) {
String aiModelAsString = JacksonUtil.toString(aiModel.get());
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, edge.getCustomerId());
pushEntityEventToRuleEngine(tenantId, aiModelId, edge.getCustomerId(), TbMsgType.ENTITY_CREATED, aiModelAsString, msgMetaData);
} else {
log.warn("[{}][{}] Failed to find aiModel", tenantId, aiModelId);
}
} catch (Exception e) {
log.warn("[{}][{}] Failed to push aiModel action to rule engine: {}", tenantId, aiModelId, TbMsgType.ENTITY_CREATED.name(), e);
}
}
}

View File

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.ai;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
public interface AiModelProcessor extends EdgeProcessor {
ListenableFuture<Void> processAiModelMsgFromEdge(TenantId tenantId, Edge edge, AiModelUpdateMsg aiModelUpdateMsg);
}

View File

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.ai;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.ai.AiModel;
import org.thingsboard.server.common.data.id.AiModelId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
import java.util.Optional;
@Slf4j
public abstract class BaseAiModelProcessor extends BaseEdgeProcessor {
@Autowired
private DataValidator<AiModel> aiModelValidator;
protected Pair<Boolean, Boolean> saveOrUpdateAiModel(TenantId tenantId, AiModelId aiModelId, AiModelUpdateMsg aiModelUpdateMsg) {
boolean isCreated = false;
boolean isNameUpdated = false;
try {
AiModel aiModel = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true);
if (aiModel == null) {
throw new RuntimeException("[{" + tenantId + "}] aiModelUpdateMsg {" + aiModelUpdateMsg + " } cannot be converted to aiModel");
}
Optional<AiModel> aiModelById = edgeCtx.getAiModelService().findAiModelById(tenantId, aiModelId);
if (aiModelById.isEmpty()) {
aiModel.setCreatedTime(Uuids.unixTimestamp(aiModelId.getId()));
isCreated = true;
aiModel.setId(null);
} else {
aiModel.setId(aiModelId);
}
String aiModelName = aiModel.getName();
Optional<AiModel> aiModelByName = edgeCtx.getAiModelService().findAiModelByTenantIdAndName(aiModel.getTenantId(), aiModelName);
if (aiModelByName.isPresent() && !aiModelByName.get().getId().equals(aiModelId)) {
aiModelName = aiModelName + "_" + StringUtils.randomAlphabetic(15);
log.warn("[{}] aiModel with name {} already exists. Renaming aiModel name to {}",
tenantId, aiModel.getName(), aiModelByName.get().getName());
isNameUpdated = true;
}
aiModel.setName(aiModelName);
aiModelValidator.validate(aiModel, AiModel::getTenantId);
if (isCreated) {
aiModel.setId(aiModelId);
}
edgeCtx.getAiModelService().save(aiModel, false);
} catch (Exception e) {
log.error("[{}] Failed to process aiModel update msg [{}]", tenantId, aiModelUpdateMsg, e);
throw e;
}
return Pair.of(isCreated, isNameUpdated);
}
}

View File

@ -0,0 +1,197 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.edge;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.ai.AiModel;
import org.thingsboard.server.common.data.ai.model.chat.OpenAiChatModelConfig;
import org.thingsboard.server.common.data.ai.provider.OpenAiProviderConfig;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UplinkResponseMsg;
import java.util.Optional;
import java.util.UUID;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@DaoSqlTest
public class AiModelEdgeTest extends AbstractEdgeTest {
private static final String DEFAULT_AI_MODEL_NAME = "Edge Test AiModel";
private static final String UPDATED_AI_MODEL_NAME = "Updated Edge Test AiModel";
@Test
public void testAiModel_create_update_delete() throws Exception {
// create AiModel
AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME);
edgeImitator.expectMessageAmount(1);
AiModel savedAiModel = doPost("/api/ai/model", aiModel, AiModel.class);
Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg);
AiModelUpdateMsg aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage;
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType());
Assert.assertEquals(savedAiModel.getUuidId().getMostSignificantBits(), aiModelUpdateMsg.getIdMSB());
Assert.assertEquals(savedAiModel.getUuidId().getLeastSignificantBits(), aiModelUpdateMsg.getIdLSB());
AiModel aiModelFromMsg = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true);
Assert.assertNotNull(aiModelFromMsg);
Assert.assertEquals(DEFAULT_AI_MODEL_NAME, aiModelFromMsg.getName());
Assert.assertEquals(savedAiModel.getTenantId(), aiModelFromMsg.getTenantId());
// update AiModel
edgeImitator.expectMessageAmount(1);
savedAiModel.setName(UPDATED_AI_MODEL_NAME);
savedAiModel = doPost("/api/ai/model", savedAiModel, AiModel.class);
Assert.assertTrue(edgeImitator.waitForMessages());
latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg);
aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage;
aiModelFromMsg = JacksonUtil.fromString(aiModelUpdateMsg.getEntity(), AiModel.class, true);
Assert.assertNotNull(aiModelFromMsg);
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType());
Assert.assertEquals(UPDATED_AI_MODEL_NAME, aiModelFromMsg.getName());
// delete AiModel
edgeImitator.expectMessageAmount(1);
doDelete("/api/ai/model/" + savedAiModel.getUuidId())
.andExpect(status().isOk());
Assert.assertTrue(edgeImitator.waitForMessages());
latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof AiModelUpdateMsg);
aiModelUpdateMsg = (AiModelUpdateMsg) latestMessage;
Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, aiModelUpdateMsg.getMsgType());
Assert.assertEquals(savedAiModel.getUuidId().getMostSignificantBits(), aiModelUpdateMsg.getIdMSB());
Assert.assertEquals(savedAiModel.getUuidId().getLeastSignificantBits(), aiModelUpdateMsg.getIdLSB());
}
@Test
public void testSendAiModelToCloud() throws Exception {
AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME);
UUID uuid = Uuids.timeBased();
UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
checkAiModelOnCloud(uplinkMsg, uuid, aiModel.getName());
}
@Test
public void testUpdateAiModelNameOnCloud() throws Exception {
AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME);
UUID uuid = Uuids.timeBased();
UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
checkAiModelOnCloud(uplinkMsg, uuid, aiModel.getName());
aiModel.setName(UPDATED_AI_MODEL_NAME);
UplinkMsg updatedUplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
checkAiModelOnCloud(updatedUplinkMsg, uuid, aiModel.getName());
}
@Test
public void testAiModelToCloudWithNameThatAlreadyExistsOnCloud() throws Exception {
AiModel aiModel = createSimpleAiModel(DEFAULT_AI_MODEL_NAME);
edgeImitator.expectMessageAmount(1);
AiModel savedAiModel = doPost("/api/ai/model", aiModel, AiModel.class);
Assert.assertTrue(edgeImitator.waitForMessages());
UUID uuid = Uuids.timeBased();
UplinkMsg uplinkMsg = getUplinkMsg(uuid, aiModel, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsg);
Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages());
Optional<AiModelUpdateMsg> aiModelUpdateMsgOpt = edgeImitator.findMessageByType(AiModelUpdateMsg.class);
Assert.assertTrue(aiModelUpdateMsgOpt.isPresent());
AiModelUpdateMsg latestAiModelUpdateMsg = aiModelUpdateMsgOpt.get();
AiModel aiModelFromMsg = JacksonUtil.fromString(latestAiModelUpdateMsg.getEntity(), AiModel.class, true);
Assert.assertNotNull(aiModelFromMsg);
Assert.assertNotEquals(DEFAULT_AI_MODEL_NAME, aiModelFromMsg.getName());
Assert.assertNotEquals(savedAiModel.getUuidId(), uuid);
AiModel aiModelFromCloud = doGet("/api/ai/model/" + uuid, AiModel.class);
Assert.assertNotNull(aiModelFromCloud);
Assert.assertNotEquals(DEFAULT_AI_MODEL_NAME, aiModelFromCloud.getName());
}
private AiModel createSimpleAiModel(String name) {
AiModel aiModel = new AiModel();
aiModel.setTenantId(tenantId);
aiModel.setName(name);
aiModel.setConfiguration(OpenAiChatModelConfig.builder()
.providerConfig(new OpenAiProviderConfig("test-api-key"))
.modelId("gpt-4o")
.temperature(0.5)
.topP(0.3)
.frequencyPenalty(0.1)
.presencePenalty(0.2)
.maxOutputTokens(1000)
.timeoutSeconds(60)
.maxRetries(2)
.build());
return aiModel;
}
private UplinkMsg getUplinkMsg(UUID uuid, AiModel aiModel, UpdateMsgType updateMsgType) throws InvalidProtocolBufferException {
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
AiModelUpdateMsg.Builder aiModelUpdateMsgBuilder = AiModelUpdateMsg.newBuilder();
aiModelUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
aiModelUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
aiModelUpdateMsgBuilder.setEntity(JacksonUtil.toString(aiModel));
aiModelUpdateMsgBuilder.setMsgType(updateMsgType);
testAutoGeneratedCodeByProtobuf(aiModelUpdateMsgBuilder);
uplinkMsgBuilder.addAiModelUpdateMsg(aiModelUpdateMsgBuilder.build());
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
return uplinkMsgBuilder.build();
}
private void checkAiModelOnCloud(UplinkMsg uplinkMsg, UUID uuid, String resourceTitle) throws Exception {
edgeImitator.expectResponsesAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsg);
Assert.assertTrue(edgeImitator.waitForResponses());
UplinkResponseMsg latestResponseMsg = edgeImitator.getLatestResponseMsg();
Assert.assertTrue(latestResponseMsg.getSuccess());
AiModel aiModel = doGet("/api/ai/model/" + uuid, AiModel.class);
Assert.assertNotNull(aiModel);
Assert.assertEquals(resourceTitle, aiModel.getName());
}
}

View File

@ -29,6 +29,7 @@ import org.thingsboard.edge.rpc.EdgeGrpcClient;
import org.thingsboard.edge.rpc.EdgeRpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient;
import org.thingsboard.server.controller.AbstractWebTest; import org.thingsboard.server.controller.AbstractWebTest;
import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AiModelUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
@ -358,6 +359,11 @@ public class EdgeImitator {
result.add(saveDownlinkMsg(calculatedFieldUpdateMsg)); result.add(saveDownlinkMsg(calculatedFieldUpdateMsg));
} }
} }
if (downlinkMsg.getAiModelUpdateMsgCount() > 0) {
for (AiModelUpdateMsg aiModelUpdateMsg : downlinkMsg.getAiModelUpdateMsgList()) {
result.add(saveDownlinkMsg(aiModelUpdateMsg));
}
}
if (downlinkMsg.hasEdgeConfiguration()) { if (downlinkMsg.hasEdgeConfiguration()) {
result.add(saveDownlinkMsg(downlinkMsg.getEdgeConfiguration())); result.add(saveDownlinkMsg(downlinkMsg.getEdgeConfiguration()));
} }

View File

@ -29,6 +29,8 @@ public interface AiModelService extends EntityDaoService {
AiModel save(AiModel model); AiModel save(AiModel model);
AiModel save(AiModel model, boolean doValidate);
Optional<AiModel> findAiModelById(TenantId tenantId, AiModelId modelId); Optional<AiModel> findAiModelById(TenantId tenantId, AiModelId modelId);
PageData<AiModel> findAiModelsByTenantId(TenantId tenantId, PageLink pageLink); PageData<AiModel> findAiModelsByTenantId(TenantId tenantId, PageLink pageLink);
@ -37,6 +39,8 @@ public interface AiModelService extends EntityDaoService {
FluentFuture<Optional<AiModel>> findAiModelByTenantIdAndIdAsync(TenantId tenantId, AiModelId modelId); FluentFuture<Optional<AiModel>> findAiModelByTenantIdAndIdAsync(TenantId tenantId, AiModelId modelId);
Optional<AiModel> findAiModelByTenantIdAndName(TenantId tenantId, String name);
boolean deleteByTenantIdAndId(TenantId tenantId, AiModelId modelId); boolean deleteByTenantIdAndId(TenantId tenantId, AiModelId modelId);
} }

View File

@ -47,7 +47,8 @@ public enum EdgeEventType {
TB_RESOURCE(true, EntityType.TB_RESOURCE), TB_RESOURCE(true, EntityType.TB_RESOURCE),
OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT), OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT),
DOMAIN(true, EntityType.DOMAIN), DOMAIN(true, EntityType.DOMAIN),
CALCULATED_FIELD(false, EntityType.CALCULATED_FIELD); CALCULATED_FIELD(false, EntityType.CALCULATED_FIELD),
AI_MODEL(true, EntityType.AI_MODEL);
private final boolean allEdgesRelated; private final boolean allEdgesRelated;

View File

@ -113,6 +113,7 @@ public class EntityIdFactory {
case OAUTH2_CLIENT -> new OAuth2ClientId(uuid); case OAUTH2_CLIENT -> new OAuth2ClientId(uuid);
case DOMAIN -> new DomainId(uuid); case DOMAIN -> new DomainId(uuid);
case CALCULATED_FIELD -> new CalculatedFieldId(uuid); case CALCULATED_FIELD -> new CalculatedFieldId(uuid);
case AI_MODEL -> new AiModelId(uuid);
default -> throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!"); default -> throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!");
}; };
} }

View File

@ -136,7 +136,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
.setConnectRequestMsg(ConnectRequestMsg.newBuilder() .setConnectRequestMsg(ConnectRequestMsg.newBuilder()
.setEdgeRoutingKey(edgeKey) .setEdgeRoutingKey(edgeKey)
.setEdgeSecret(edgeSecret) .setEdgeSecret(edgeSecret)
.setEdgeVersion(EdgeVersion.V_4_2_0) .setEdgeVersion(EdgeVersion.V_4_3_0)
.setMaxInboundMessageSize(maxInboundMessageSize) .setMaxInboundMessageSize(maxInboundMessageSize)
.build()) .build())
.build()); .build());

View File

@ -44,6 +44,7 @@ enum EdgeVersion {
V_4_0_0 = 10; V_4_0_0 = 10;
V_4_1_0 = 11; V_4_1_0 = 11;
V_4_2_0 = 12; V_4_2_0 = 12;
V_4_3_0 = 13;
V_LATEST = 999; V_LATEST = 999;
} }
@ -133,6 +134,12 @@ message CalculatedFieldUpdateMsg{
string entity = 4; string entity = 4;
} }
message AiModelUpdateMsg{
UpdateMsgType msgType = 1;
int64 idMSB = 2;
int64 idLSB = 3;
string entity = 4;
}
message EntityDataProto { message EntityDataProto {
int64 entityIdMSB = 1; int64 entityIdMSB = 1;
@ -441,6 +448,7 @@ message UplinkMsg {
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 24; repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 24;
repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 25; repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 25;
repeated CalculatedFieldRequestMsg calculatedFieldRequestMsg = 26; repeated CalculatedFieldRequestMsg calculatedFieldRequestMsg = 26;
repeated AiModelUpdateMsg aiModelUpdateMsg = 27;
} }
message UplinkResponseMsg { message UplinkResponseMsg {
@ -491,4 +499,5 @@ message DownlinkMsg {
repeated NotificationTemplateUpdateMsg notificationTemplateUpdateMsg = 33; repeated NotificationTemplateUpdateMsg notificationTemplateUpdateMsg = 33;
repeated OAuth2DomainUpdateMsg oAuth2DomainUpdateMsg = 34; repeated OAuth2DomainUpdateMsg oAuth2DomainUpdateMsg = 34;
repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 35; repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 35;
repeated AiModelUpdateMsg aiModelUpdateMsg = 36;
} }

View File

@ -29,12 +29,15 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.CachedVersionedEntityService; import org.thingsboard.server.dao.entity.CachedVersionedEntityService;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.model.sql.AiModelEntity; import org.thingsboard.server.dao.model.sql.AiModelEntity;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.sql.JpaExecutorService; import org.thingsboard.server.dao.sql.JpaExecutorService;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import static org.thingsboard.server.dao.service.Validator.validatePageLink; import static org.thingsboard.server.dao.service.Validator.validatePageLink;
@ -63,11 +66,23 @@ class AiModelServiceImpl extends CachedVersionedEntityService<AiModelCacheKey, A
@Override @Override
@Transactional @Transactional
public AiModel save(AiModel model) { public AiModel save(AiModel model) {
aiModelValidator.validate(model, AiModel::getTenantId); return save(model, true);
}
@Override
public AiModel save(AiModel aiModel, boolean doValidate) {
AiModel oldAiModel = null;
if (doValidate) {
oldAiModel = aiModelValidator.validate(aiModel, AiModel::getTenantId);
} else if (aiModel.getId() != null) {
oldAiModel = findAiModelById(aiModel.getTenantId(), aiModel.getId()).orElse(null);
}
AiModel savedModel; AiModel savedModel;
try { try {
savedModel = aiModelDao.saveAndFlush(model.getTenantId(), model); savedModel = aiModelDao.saveAndFlush(aiModel.getTenantId(), aiModel);
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedModel.getTenantId()).entityId(savedModel.getId())
.entity(savedModel).oldEntity(oldAiModel).created(oldAiModel == null).broadcastEvent(true).build());
} catch (Exception e) { } catch (Exception e) {
checkConstraintViolation(e, checkConstraintViolation(e,
"ai_model_name_unq_key", "AI model with such name already exist!", "ai_model_name_unq_key", "AI model with such name already exist!",
@ -103,10 +118,15 @@ class AiModelServiceImpl extends CachedVersionedEntityService<AiModelCacheKey, A
return FluentFuture.from(jpaExecutor.submit(() -> findAiModelByTenantIdAndId(tenantId, modelId))); return FluentFuture.from(jpaExecutor.submit(() -> findAiModelByTenantIdAndId(tenantId, modelId)));
} }
@Override
public Optional<AiModel> findAiModelByTenantIdAndName(TenantId tenantId, String name) {
return Optional.ofNullable(aiModelDao.findByTenantIdAndName(tenantId.getId(), name));
}
@Override @Override
@Transactional @Transactional
public boolean deleteByTenantIdAndId(TenantId tenantId, AiModelId modelId) { public boolean deleteByTenantIdAndId(TenantId tenantId, AiModelId modelId) {
return deleteByTenantIdAndIdInternal(tenantId, modelId); return deleteByTenantIdAndIdInternal(tenantId, modelId.getId());
} }
@Override @Override
@ -123,14 +143,21 @@ class AiModelServiceImpl extends CachedVersionedEntityService<AiModelCacheKey, A
@Override @Override
@Transactional @Transactional
public void deleteEntity(TenantId tenantId, EntityId id, boolean force) { public void deleteEntity(TenantId tenantId, EntityId id, boolean force) {
deleteByTenantIdAndIdInternal(tenantId, new AiModelId(id.getId())); deleteByTenantIdAndIdInternal(tenantId, id.getId());
} }
private boolean deleteByTenantIdAndIdInternal(TenantId tenantId, AiModelId modelId) { private boolean deleteByTenantIdAndIdInternal(TenantId tenantId, UUID modelId) {
boolean deleted = aiModelDao.deleteByTenantIdAndId(tenantId, modelId); AiModel aiModel = findAiModelById(tenantId, new AiModelId(modelId)).orElse(null);
if (deleted) { if (aiModel == null) {
publishEvictEvent(new AiModelCacheEvictEvent.Deleted(AiModelCacheKey.of(tenantId, modelId))); return false;
} }
boolean deleted = aiModelDao.deleteByTenantIdAndId(tenantId, aiModel.getId());
if (deleted) {
publishEvictEvent(new AiModelCacheEvictEvent.Deleted(AiModelCacheKey.of(tenantId, aiModel.getId())));
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(aiModel.getId()).entity(aiModel).build());
}
return deleted; return deleted;
} }

View File

@ -74,6 +74,7 @@ import org.thingsboard.server.common.data.UpdateMessage;
import org.thingsboard.server.common.data.UsageInfo; import org.thingsboard.server.common.data.UsageInfo;
import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.UserEmailInfo; import org.thingsboard.server.common.data.UserEmailInfo;
import org.thingsboard.server.common.data.ai.AiModel;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
@ -98,6 +99,7 @@ import org.thingsboard.server.common.data.edge.EdgeInfo;
import org.thingsboard.server.common.data.edge.EdgeInstructions; import org.thingsboard.server.common.data.edge.EdgeInstructions;
import org.thingsboard.server.common.data.edge.EdgeSearchQuery; import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery; import org.thingsboard.server.common.data.entityview.EntityViewSearchQuery;
import org.thingsboard.server.common.data.id.AiModelId;
import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmCommentId;
import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetId;
@ -4144,6 +4146,29 @@ public class RestClient implements Closeable {
} }
} }
public AiModel saveAiModel(AiModel aiModel) {
return restTemplate.postForEntity(baseURL + "/api/ai/model", aiModel, AiModel.class).getBody();
}
public Optional<AiModel> getAiModel(AiModelId aiModelId) {
try {
ResponseEntity<AiModel> response = restTemplate.getForEntity(
baseURL + "/api/aiModel/{aiModelId}", AiModel.class, aiModelId.getId());
return Optional.ofNullable(response.getBody());
} catch (HttpClientErrorException exception) {
if (exception.getStatusCode() == HttpStatus.NOT_FOUND) {
return Optional.empty();
} else {
throw exception;
}
}
}
public void deleteAiModel(AiModelId aiModelId) {
restTemplate.delete(baseURL + "/api/aiModel/{aiModelId}", aiModelId.getId());
}
private String getTimeUrlParams(TimePageLink pageLink) { private String getTimeUrlParams(TimePageLink pageLink) {
String urlParams = getUrlParams(pageLink); String urlParams = getUrlParams(pageLink);
if (pageLink.getStartTime() != null) { if (pageLink.getStartTime() != null) {

View File

@ -92,7 +92,6 @@ import static org.thingsboard.server.dao.service.ConstraintValidator.validateFie
configClazz = TbAiNodeConfiguration.class, configClazz = TbAiNodeConfiguration.class,
configDirective = "tbExternalNodeAiConfig", configDirective = "tbExternalNodeAiConfig",
iconUrl = "", iconUrl = "",
ruleChainTypes = RuleChainType.CORE,
docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/ai-request/" docUrl = "https://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/ai-request/"
) )
public final class TbAiNode extends TbAbstractExternalNode implements TbNode { public final class TbAiNode extends TbAbstractExternalNode implements TbNode {