diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index abdc89ebd9..1193f935a0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -62,7 +62,7 @@ import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.profile.AssetProfileEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.calculated.CalculatedFieldProcessor; +import org.thingsboard.server.service.edge.rpc.processor.cf.CalculatedFieldProcessor; import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.device.profile.DeviceProfileEdgeProcessor; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index e49f22ed7e..dc16c5b229 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -222,7 +222,7 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { if (edgeId != null) { return saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body); } else { - return processNotificationToRelatedEdges(tenantId, entityId, type, actionType, originatorEdgeId); + return processNotificationToRelatedEdges(tenantId, entityId, entityId, type, actionType, originatorEdgeId); } case DELETED: EdgeEventActionType deleted = EdgeEventActionType.DELETED; @@ -260,11 +260,11 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor { } } - private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, - EdgeEventActionType actionType, EdgeId sourceEdgeId) { + protected ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId ownerEntityId, EntityId entityId, EdgeEventType type, + EdgeEventActionType actionType, EdgeId sourceEdgeId) { List> futures = new ArrayList<>(); PageDataIterableByTenantIdEntityId edgeIds = - new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, entityId, RELATED_EDGES_CACHE_ITEMS); + new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, ownerEntityId, RELATED_EDGES_CACHE_ITEMS); for (EdgeId relatedEdgeId : edgeIds) { if (!relatedEdgeId.equals(sourceEdgeId)) { futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/BaseCalculatedFieldProcessor.java similarity index 66% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/BaseCalculatedFieldProcessor.java index a7781fe55d..4ef6ec7ba2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/BaseCalculatedFieldProcessor.java @@ -13,33 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor.calculated; +package org.thingsboard.server.service.edge.rpc.processor.cf; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.util.Pair; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.CalculatedField; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; -import java.util.ArrayList; -import java.util.List; - -import static org.thingsboard.server.dao.edge.BaseRelatedEdgesService.RELATED_EDGES_CACHE_ITEMS; - @Slf4j public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { @@ -88,23 +76,4 @@ public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { return Pair.of(isCreated, isNameUpdated); } - protected ListenableFuture pushEventToAllRelatedEdges(TenantId tenantId, EntityId calculatedFieldOwnerId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) { - List> futures = new ArrayList<>(); - PageDataIterableByTenantIdEntityId edgeIds = - new PageDataIterableByTenantIdEntityId<>(edgeCtx.getEdgeService()::findRelatedEdgeIdsByEntityId, tenantId, calculatedFieldOwnerId, RELATED_EDGES_CACHE_ITEMS); - for (EdgeId relatedEdgeId : edgeIds) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); - } - } - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - } - - protected ListenableFuture pushEventToAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { - return switch (actionType) { - case ADDED, UPDATED, DELETED -> processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); - default -> Futures.immediateFuture(null); - }; - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/CalculatedFieldEdgeProcessor.java similarity index 96% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/CalculatedFieldEdgeProcessor.java index a0212bb4df..cab4b5ecc1 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/CalculatedFieldEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor.calculated; +package org.thingsboard.server.service.edge.rpc.processor.cf; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; @@ -134,9 +134,9 @@ public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor i return edgeId != null ? saveEdgeEvent(tenantId, edgeId, type, actionType, entityId, body) : - pushEventToAllRelatedEdges(tenantId, calculatedFieldOwnerId, entityId, type, actionType, originatorEdgeId); + processNotificationToRelatedEdges(tenantId, calculatedFieldOwnerId, entityId, type, actionType, originatorEdgeId); } else { - return pushEventToAllEdges(tenantId, type, actionType, entityId, originatorEdgeId); + return processActionForAllEdges(tenantId, type, actionType, entityId, null, originatorEdgeId); } default: return super.processEntityNotification(tenantId, edgeNotificationMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/CalculatedFieldProcessor.java similarity index 94% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/CalculatedFieldProcessor.java index ba9c8b27e1..d21af858f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/cf/CalculatedFieldProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor.calculated; +package org.thingsboard.server.service.edge.rpc.processor.cf; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.Edge; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 7cdf916439..66ff05b45a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -78,7 +78,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.stream.Collectors; @Service @TbCoreComponent @@ -322,11 +321,10 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { return saveEdgeEvent(tenantId, edgeId, EdgeEventType.CALCULATED_FIELD, EdgeEventActionType.ADDED, calculatedField.getId(), JacksonUtil.valueToTree(calculatedField)); } catch (Exception e) { - String errMsg = String.format("[%s][%s] Exception during loading calculatedField [%s] to edge on sync!", tenantId, edgeId, calculatedField); - log.error(errMsg, e); + log.error("[{}][{}] Exception during loading calculatedField [{}] to edge on sync!", tenantId, edgeId, calculatedField, e); return Futures.immediateFailedFuture(e); } - }).collect(Collectors.toList()); + }).toList(); return Futures.transform( Futures.allAsList(futures), diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index dc6abbde8b..c0cb886747 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -58,20 +58,17 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements @Override public CalculatedField save(CalculatedField calculatedField) { CalculatedField oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); - return doSave(calculatedField, oldCalculatedField); } @Override public CalculatedField save(CalculatedField calculatedField, boolean doValidate) { CalculatedField oldCalculatedField = null; - if (doValidate) { oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); } else if (calculatedField.getId() != null) { oldCalculatedField = findById(calculatedField.getTenantId(), calculatedField.getId()); } - return doSave(calculatedField, oldCalculatedField); } @@ -106,7 +103,6 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements public CalculatedField findByEntityIdAndName(EntityId entityId, String name) { log.trace("Executing findByEntityIdAndName [{}], calculatedFieldName[{}]", entityId, name); validateId(entityId.getId(), id -> INCORRECT_ENTITY_ID + id); - return calculatedFieldDao.findByEntityIdAndName(entityId, name); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index ebcfd7678a..0655d05572 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -524,7 +524,6 @@ public class EdgeServiceImpl extends AbstractCachedEntityService