Refactoring - introduced new method - findActiveEdges for all tenants
This commit is contained in:
parent
69bfe737b7
commit
2478acc42d
@ -65,7 +65,6 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@ -144,23 +143,17 @@ public abstract class BaseEdgeProcessor implements EdgeProcessor {
|
|||||||
protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type,
|
protected ListenableFuture<Void> processActionForAllEdges(TenantId tenantId, EdgeEventType type,
|
||||||
EdgeEventActionType actionType, EntityId entityId,
|
EdgeEventActionType actionType, EntityId entityId,
|
||||||
JsonNode body, EdgeId sourceEdgeId) {
|
JsonNode body, EdgeId sourceEdgeId) {
|
||||||
|
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||||
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
|
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
|
||||||
PageDataIterable<TenantId> tenantIds = new PageDataIterable<>(link -> edgeCtx.getTenantService().findTenantsIds(link), 500);
|
PageDataIterable<Edge> edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findActiveEdges(link), 1024);
|
||||||
for (TenantId tenantId1 : tenantIds) {
|
for (Edge edge : edges) {
|
||||||
try {
|
futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), type, actionType, entityId, body));
|
||||||
List<ListenableFuture<Void>> sysTenantFutures = processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, body, sourceEdgeId);
|
|
||||||
for (ListenableFuture<Void> future : sysTenantFutures) {
|
|
||||||
future.get(10, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Failed to process action for all edges by SYS_TENANT_ID. Failed tenantId = [{}]", tenantId1, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
} else {
|
} else {
|
||||||
List<ListenableFuture<Void>> tenantFutures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
|
futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null, sourceEdgeId);
|
||||||
return Futures.transform(Futures.allAsList(tenantFutures), voids -> null, dbCallbackExecutorService);
|
|
||||||
}
|
}
|
||||||
|
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
|
private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
|
||||||
|
|||||||
@ -47,6 +47,8 @@ public interface EdgeService extends EntityDaoService {
|
|||||||
|
|
||||||
Optional<Edge> findEdgeByRoutingKey(TenantId tenantId, String routingKey);
|
Optional<Edge> findEdgeByRoutingKey(TenantId tenantId, String routingKey);
|
||||||
|
|
||||||
|
PageData<Edge> findActiveEdges(PageLink pageLink);
|
||||||
|
|
||||||
Edge saveEdge(Edge edge);
|
Edge saveEdge(Edge edge);
|
||||||
|
|
||||||
Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId);
|
Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId);
|
||||||
|
|||||||
@ -41,6 +41,8 @@ public interface EdgeDao extends Dao<Edge>, TenantEntityDao<Edge> {
|
|||||||
|
|
||||||
EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId);
|
EdgeInfo findEdgeInfoById(TenantId tenantId, UUID edgeId);
|
||||||
|
|
||||||
|
PageData<Edge> findActiveEdges(PageLink pageLink);
|
||||||
|
|
||||||
PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink);
|
PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink);
|
||||||
|
|
||||||
PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink);
|
PageData<Edge> findEdgesByTenantId(UUID tenantId, PageLink pageLink);
|
||||||
|
|||||||
@ -192,6 +192,13 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
|
|||||||
return edgeDao.findByRoutingKey(tenantId.getId(), routingKey);
|
return edgeDao.findByRoutingKey(tenantId.getId(), routingKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PageData<Edge> findActiveEdges(PageLink pageLink) {
|
||||||
|
log.trace("Executing findActiveEdges [{}]", pageLink);
|
||||||
|
Validator.validatePageLink(pageLink);
|
||||||
|
return edgeDao.findActiveEdges(pageLink);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Edge saveEdge(Edge edge) {
|
public Edge saveEdge(Edge edge) {
|
||||||
log.trace("Executing saveEdge [{}]", edge);
|
log.trace("Executing saveEdge [{}]", edge);
|
||||||
|
|||||||
@ -44,6 +44,18 @@ public interface EdgeRepository extends JpaRepository<EdgeEntity, UUID> {
|
|||||||
"WHERE d.id = :edgeId")
|
"WHERE d.id = :edgeId")
|
||||||
EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId);
|
EdgeInfoEntity findEdgeInfoById(@Param("edgeId") UUID edgeId);
|
||||||
|
|
||||||
|
@Query(value = "SELECT ee.id, ee.created_time, ee.additional_info, ee.customer_id, " +
|
||||||
|
"ee.root_rule_chain_id, ee.type, ee.name, ee.label, ee.routing_key, " +
|
||||||
|
"ee.secret, ee.tenant_id, ee.version " +
|
||||||
|
"FROM edge ee " +
|
||||||
|
"JOIN attribute_kv ON ee.id = attribute_kv.entity_id " +
|
||||||
|
"JOIN key_dictionary ON attribute_kv.attribute_key = key_dictionary.key_id " +
|
||||||
|
"WHERE attribute_kv.bool_v = true AND key_dictionary.key = 'active' " +
|
||||||
|
"AND (:textSearch IS NULL OR ee.name ILIKE CONCAT('%', :textSearch, '%')) " +
|
||||||
|
"ORDER BY ee.id", nativeQuery = true)
|
||||||
|
Page<EdgeEntity> findActiveEdges(@Param("textSearch") String textSearch,
|
||||||
|
Pageable pageable);
|
||||||
|
|
||||||
@Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " +
|
@Query("SELECT d.id FROM EdgeEntity d WHERE d.tenantId = :tenantId " +
|
||||||
"AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)")
|
"AND (:textSearch IS NULL OR ilike(d.name, CONCAT('%', :textSearch, '%')) = true)")
|
||||||
Page<UUID> findIdsByTenantId(@Param("tenantId") UUID tenantId,
|
Page<UUID> findIdsByTenantId(@Param("tenantId") UUID tenantId,
|
||||||
|
|||||||
@ -66,6 +66,14 @@ public class JpaEdgeDao extends JpaAbstractDao<EdgeEntity, Edge> implements Edge
|
|||||||
return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId));
|
return DaoUtil.getData(edgeRepository.findEdgeInfoById(edgeId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PageData<Edge> findActiveEdges(PageLink pageLink) {
|
||||||
|
return DaoUtil.toPageData(
|
||||||
|
edgeRepository.findActiveEdges(
|
||||||
|
pageLink.getTextSearch(),
|
||||||
|
DaoUtil.toPageable(pageLink)));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) {
|
public PageData<EdgeId> findEdgeIdsByTenantId(UUID tenantId, PageLink pageLink) {
|
||||||
return DaoUtil.pageToPageData(
|
return DaoUtil.pageToPageData(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user