CalculatedField functionality support for Edge

- refactoring
This commit is contained in:
Yevhenii 2025-06-25 12:11:51 +03:00
parent 2414b97923
commit 3ae97e0d7e
8 changed files with 12 additions and 50 deletions

View File

@ -62,7 +62,7 @@ import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor;
import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor;
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.asset.profile.AssetProfileEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.profile.AssetProfileEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.calculated.CalculatedFieldProcessor; import org.thingsboard.server.service.edge.rpc.processor.cf.CalculatedFieldProcessor;
import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.device.profile.DeviceProfileEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.device.profile.DeviceProfileEdgeProcessor;

View File

@ -222,7 +222,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
if (edgeId != null) { if (edgeId != null) {
return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body);
} else { } else {
return processNotificationToRelatedEdges(tenantId, entityId, type, actionType, originatorEdgeId); return processNotificationToRelatedEdges(tenantId, entityId, entityId, type, actionType, originatorEdgeId);
} }
case DELETED: case DELETED:
EdgeEventActionType deleted = EdgeEventActionType.DELETED; EdgeEventActionType deleted = EdgeEventActionType.DELETED;
@ -260,11 +260,11 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
} }
} }
private ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, protected ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId ownerEntityId, EntityId entityId, EdgeEventType type,
EdgeEventActionType actionType, EdgeId sourceEdgeId) { EdgeEventActionType actionType, EdgeId sourceEdgeId) {
List<ListenableFuture<Void>> futures = new ArrayList<>(); List<ListenableFuture<Void>> futures = new ArrayList<>();
PageDataIterableByTenantIdEntityId<EdgeId> edgeIds = PageDataIterableByTenantIdEntityId<EdgeId> edgeIds =
new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, entityId, RELATED_EDGES_CACHE_ITEMS); new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, ownerEntityId, RELATED_EDGES_CACHE_ITEMS);
for (EdgeId relatedEdgeId : edgeIds) { for (EdgeId relatedEdgeId : edgeIds) {
if (!relatedEdgeId.equals(sourceEdgeId)) { if (!relatedEdgeId.equals(sourceEdgeId)) {
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));

View File

@ -13,33 +13,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.edge.rpc.processor.calculated; package org.thingsboard.server.service.edge.rpc.processor.cf;
import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair; import org.springframework.data.util.Pair;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
import java.util.ArrayList;
import java.util.List;
import static org.thingsboard.server.dao.edge.BaseRelatedEdgesService.RELATED_EDGES_CACHE_ITEMS;
@Slf4j @Slf4j
public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
@ -88,23 +76,4 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
return Pair.of(isCreated, isNameUpdated); return Pair.of(isCreated, isNameUpdated);
} }
protected ListenableFuture<Void> pushEventToAllRelatedEdges(TenantId tenantId, EntityId calculatedFieldOwnerId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) {
List<ListenableFuture<Void>> futures = new ArrayList<>();
PageDataIterableByTenantIdEntityId<EdgeId> edgeIds =
new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, calculatedFieldOwnerId, RELATED_EDGES_CACHE_ITEMS);
for (EdgeId relatedEdgeId : edgeIds) {
if (!relatedEdgeId.equals(sourceEdgeId)) {
futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
}
}
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
}
protected ListenableFuture<Void> pushEventToAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
return switch (actionType) {
case ADDED, UPDATED, DELETED -> processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
default -> Futures.immediateFuture(null);
};
}
} }

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.edge.rpc.processor.calculated; package org.thingsboard.server.service.edge.rpc.processor.cf;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -134,9 +134,9 @@ public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor i
return edgeId != null ? return edgeId != null ?
saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body) : saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body) :
pushEventToAllRelatedEdges(tenantId, calculatedFieldOwnerId, entityId, type, actionType, originatorEdgeId); processNotificationToRelatedEdges(tenantId, calculatedFieldOwnerId, entityId, type, actionType, originatorEdgeId);
} else { } else {
return pushEventToAllEdges(tenantId, type, actionType, entityId, originatorEdgeId); return processActionForAllEdges(tenantId, type, actionType, entityId, null, originatorEdgeId);
} }
default: default:
return super.processEntityNotification(tenantId, edgeNotificationMsg); return super.processEntityNotification(tenantId, edgeNotificationMsg);

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.service.edge.rpc.processor.calculated; package org.thingsboard.server.service.edge.rpc.processor.cf;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;

View File

@ -78,7 +78,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors;
@Service @Service
@TbCoreComponent @TbCoreComponent
@ -322,11 +321,10 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
return saveEdgeEvent(tenantId, edgeId, EdgeEventType.CALCULATED_FIELD, return saveEdgeEvent(tenantId, edgeId, EdgeEventType.CALCULATED_FIELD,
EdgeEventActionType.ADDED, calculatedField.getId(), JacksonUtil.valueToTree(calculatedField)); EdgeEventActionType.ADDED, calculatedField.getId(), JacksonUtil.valueToTree(calculatedField));
} catch (Exception e) { } catch (Exception e) {
String errMsg = String.format("[%s][%s] Exception during loading calculatedField [%s] to edge on sync!", tenantId, edgeId, calculatedField); log.error("[{}][{}] Exception during loading calculatedField [{}] to edge on sync!", tenantId, edgeId, calculatedField, e);
log.error(errMsg, e);
return Futures.immediateFailedFuture(e); return Futures.immediateFailedFuture(e);
} }
}).collect(Collectors.toList()); }).toList();
return Futures.transform( return Futures.transform(
Futures.allAsList(futures), Futures.allAsList(futures),

View File

@ -58,20 +58,17 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
@Override @Override
public CalculatedField save(CalculatedField calculatedField) { public CalculatedField save(CalculatedField calculatedField) {
CalculatedField oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); CalculatedField oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId);
return doSave(calculatedField, oldCalculatedField); return doSave(calculatedField, oldCalculatedField);
} }
@Override @Override
public CalculatedField save(CalculatedField calculatedField, boolean doValidate) { public CalculatedField save(CalculatedField calculatedField, boolean doValidate) {
CalculatedField oldCalculatedField = null; CalculatedField oldCalculatedField = null;
if (doValidate) { if (doValidate) {
oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId);
} else if (calculatedField.getId() != null) { } else if (calculatedField.getId() != null) {
oldCalculatedField = findById(calculatedField.getTenantId(), calculatedField.getId()); oldCalculatedField = findById(calculatedField.getTenantId(), calculatedField.getId());
} }
return doSave(calculatedField, oldCalculatedField); return doSave(calculatedField, oldCalculatedField);
} }
@ -106,7 +103,6 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
public CalculatedField findByEntityIdAndName(EntityId entityId, String name) { public CalculatedField findByEntityIdAndName(EntityId entityId, String name) {
log.trace("Executing findByEntityIdAndName [{}], calculatedFieldName[{}]", entityId, name); log.trace("Executing findByEntityIdAndName [{}], calculatedFieldName[{}]", entityId, name);
validateId(entityId.getId(), id -> INCORRECT_ENTITY_ID + id); validateId(entityId.getId(), id -> INCORRECT_ENTITY_ID + id);
return calculatedFieldDao.findByEntityIdAndName(entityId, name); return calculatedFieldDao.findByEntityIdAndName(entityId, name);
} }

View File

@ -524,7 +524,6 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
case ENTITY_VIEW: case ENTITY_VIEW:
case DASHBOARD: case DASHBOARD:
case RULE_CHAIN: case RULE_CHAIN:
case CALCULATED_FIELD:
return relatedEdgesService.findEdgeIdsByEntityId(tenantId, entityId, pageLink); return relatedEdgesService.findEdgeIdsByEntityId(tenantId, entityId, pageLink);
case USER: case USER:
User userById = userService.findUserById(tenantId, new UserId(entityId.getId())); User userById = userService.findUserById(tenantId, new UserId(entityId.getId()));