From 107c5c2ef4f33a434341e32986e117a4fb2a3232 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 1 Apr 2025 10:22:29 +0300 Subject: [PATCH] added initialization of cache when partiton change event --- ...alculatedFieldManagerMessageProcessor.java | 15 +++++ .../server/actors/tenant/TenantActor.java | 1 + .../cf/DefaultCalculatedFieldInitService.java | 10 +++- ...aultCalculatedFieldEntityProfileCache.java | 56 ++++++++++++++++--- .../server/dao/cf/CalculatedFieldService.java | 4 ++ .../dao/cf/BaseCalculatedFieldService.java | 13 +++++ .../server/dao/cf/CalculatedFieldLinkDao.java | 2 + .../sql/cf/CalculatedFieldLinkRepository.java | 2 + .../dao/sql/cf/JpaCalculatedFieldLinkDao.java | 5 ++ 9 files changed, 97 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 27d3ce315a..521503a82e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -513,6 +513,21 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) { + initCalculatedFields(); ctx.broadcastToChildren(msg, true); } + + public void initCalculatedFields() { + cfDaoService.findCalculatedFieldsByTenantId(tenantId).forEach(cf -> { + try { + onFieldInitMsg(new CalculatedFieldInitMsg(cf.getTenantId(), cf)); + } catch (CalculatedFieldException e) { + throw new RuntimeException(e); + } + }); + cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId).forEach(cfLink -> { + onLinkInitMsg(new CalculatedFieldLinkInitMsg(cfLink.getTenantId(), cfLink)); + }); + } + } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 728f715af4..b42d91a4de 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -274,6 +274,7 @@ public class TenantActor extends RuleChainManagerActor { () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), () -> true); + cfActor.tellWithHighPriority(msg); } catch (Exception e) { log.info("[{}] Failed to init CF Actor.", tenantId, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java index cc3022fa29..5f02be0cde 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache; @@ -37,6 +38,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer private final CalculatedFieldEntityProfileCache entityProfileCache; private final AssetService assetService; private final DeviceService deviceService; + private final PartitionService partitionService; @Value("${calculated_fields.init_fetch_pack_size:50000}") @Getter @@ -48,7 +50,9 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer for (ProfileEntityIdInfo idInfo : deviceIdInfos) { log.trace("Processing device record: {}", idInfo); try { - entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); + if (partitionService.isManagedByCurrentService(idInfo.getTenantId())) { + entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); + } } catch (Exception e) { log.error("Failed to process device record: {}", idInfo, e); } @@ -57,7 +61,9 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer for (ProfileEntityIdInfo idInfo : assetIdInfos) { log.trace("Processing asset record: {}", idInfo); try { - entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); + if (partitionService.isManagedByCurrentService(idInfo.getTenantId())) { + entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId()); + } } catch (Exception e) { log.error("Failed to process asset record: {}", idInfo, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java index 8059b3a825..cefdd012d8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/cache/DefaultCalculatedFieldEntityProfileCache.java @@ -15,20 +15,26 @@ */ package org.thingsboard.server.service.cf.cache; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import java.util.Collection; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -42,13 +48,16 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent private static final Integer UNKNOWN = 0; private final ConcurrentMap tenantCache = new ConcurrentHashMap<>(); private final PartitionService partitionService; + private final AssetService assetService; + private final DeviceService deviceService; + + @Value("${calculated_fields.init_fetch_pack_size:50000}") + @Getter + private int initFetchPackSize; @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { - event.getCfPartitions().forEach(tpi -> { - Optional tenantIdOpt = tpi.getTenantId(); - tenantIdOpt.ifPresent(tenantId -> tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())); - }); + event.getCfPartitions().forEach(tpi -> tpi.getTenantId().ifPresent(this::initCacheForNewTenant)); } @Override @@ -58,10 +67,14 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent @Override public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { - var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); - //TODO: make this method atomic; - cache.remove(oldProfileId, entityId); - cache.add(newProfileId, entityId); + tenantCache.compute(tenantId, (id, cache) -> { + if (cache == null) { + cache = new TenantEntityProfileCache(); + } + cache.remove(oldProfileId, entityId); + cache.add(newProfileId, entityId); + return cache; + }); } @Override @@ -91,4 +104,29 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent return tpi.getPartition().orElse(UNKNOWN); } + private void initCacheForNewTenant(TenantId tenantId) { + PageDataIterable devices = new PageDataIterable<>(pageLink -> deviceService.findDevicesByTenantId(tenantId, pageLink), initFetchPackSize); + for (Device device : devices) { + log.trace("Processing device record: {}", device); + try { + if (partitionService.isManagedByCurrentService(device.getTenantId())) { + add(device.getTenantId(), device.getDeviceProfileId(), device.getId()); + } + } catch (Exception e) { + log.error("Failed to process device record: {}", device, e); + } + } + PageDataIterable assets = new PageDataIterable<>(pageLink -> assetService.findAssetsByTenantId(tenantId, pageLink), initFetchPackSize); + for (Asset asset : assets) { + log.trace("Processing asset record: {}", asset); + try { + if (partitionService.isManagedByCurrentService(asset.getTenantId())) { + add(asset.getTenantId(), asset.getAssetProfileId(), asset.getId()); + } + } catch (Exception e) { + log.error("Failed to process asset record: {}", asset, e); + } + } + } + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java index 3d91790790..8507ebbd42 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java @@ -39,6 +39,8 @@ public interface CalculatedFieldService extends EntityDaoService { PageData findAllCalculatedFields(PageLink pageLink); + List findCalculatedFieldsByTenantId(TenantId tenantId); + PageData findAllCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); void deleteCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); @@ -53,6 +55,8 @@ public interface CalculatedFieldService extends EntityDaoService { List findAllCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId); + List findAllCalculatedFieldLinksByTenantId(TenantId tenantId); + PageData findAllCalculatedFieldLinks(PageLink pageLink); boolean referencedInAnyCalculatedField(TenantId tenantId, EntityId referencedEntityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index dceef73aba..464041869e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -104,6 +104,13 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements return calculatedFieldDao.findAll(pageLink); } + @Override + public List findCalculatedFieldsByTenantId(TenantId tenantId) { + log.trace("Executing findAllByTenantId, tenantId [{}]", tenantId); + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + return calculatedFieldDao.findAllByTenantId(tenantId); + } + @Override public PageData findAllCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink) { log.trace("Executing findAllByEntityId, entityId [{}], pageLink [{}]", entityId, pageLink); @@ -174,6 +181,12 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements return calculatedFieldLinkDao.findCalculatedFieldLinksByEntityId(tenantId, entityId); } + @Override + public List findAllCalculatedFieldLinksByTenantId(TenantId tenantId) { + log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId [{}]", tenantId); + return calculatedFieldLinkDao.findCalculatedFieldLinksByTenantId(tenantId); + } + @Override public PageData findAllCalculatedFieldLinks(PageLink pageLink) { log.trace("Executing findAllCalculatedFieldLinks, pageLink [{}]", pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java index 8b4a5e7086..cf168a9b3d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldLinkDao.java @@ -31,6 +31,8 @@ public interface CalculatedFieldLinkDao extends Dao { List findCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId); + List findCalculatedFieldLinksByTenantId(TenantId tenantId); + List findAll(); PageData findAll(PageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java index 584a3b5199..aeb8e1b04c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldLinkRepository.java @@ -27,4 +27,6 @@ public interface CalculatedFieldLinkRepository extends JpaRepository findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId); + List findAllByTenantId(UUID tenantId); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java index dbb2fd87da..39b6e6b890 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldLinkDao.java @@ -54,6 +54,11 @@ public class JpaCalculatedFieldLinkDao extends JpaAbstractDao findCalculatedFieldLinksByTenantId(TenantId tenantId) { + return DaoUtil.convertDataList(calculatedFieldLinkRepository.findAllByTenantId(tenantId.getId())); + } + @Override public List findAll() { return DaoUtil.convertDataList(calculatedFieldLinkRepository.findAll());