added initialization of cache when partiton change event

This commit is contained in:
IrynaMatveieva 2025-04-01 10:22:29 +03:00
parent e242e7846c
commit 107c5c2ef4
9 changed files with 97 additions and 11 deletions

View File

@ -513,6 +513,21 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) {
initCalculatedFields();
ctx.broadcastToChildren(msg, true);
}
public void initCalculatedFields() {
cfDaoService.findCalculatedFieldsByTenantId(tenantId).forEach(cf -> {
try {
onFieldInitMsg(new CalculatedFieldInitMsg(cf.getTenantId(), cf));
} catch (CalculatedFieldException e) {
throw new RuntimeException(e);
}
});
cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId).forEach(cfLink -> {
onLinkInitMsg(new CalculatedFieldLinkInitMsg(cfLink.getTenantId(), cfLink));
});
}
}

View File

@ -274,6 +274,7 @@ public class TenantActor extends RuleChainManagerActor {
() -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
() -> true);
cfActor.tellWithHighPriority(msg);
} catch (Exception e) {
log.info("[{}] Failed to init CF Actor.", tenantId, e);
}

View File

@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.ProfileEntityIdInfo;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
@ -37,6 +38,7 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer
private final CalculatedFieldEntityProfileCache entityProfileCache;
private final AssetService assetService;
private final DeviceService deviceService;
private final PartitionService partitionService;
@Value("${calculated_fields.init_fetch_pack_size:50000}")
@Getter
@ -48,7 +50,9 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer
for (ProfileEntityIdInfo idInfo : deviceIdInfos) {
log.trace("Processing device record: {}", idInfo);
try {
entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId());
if (partitionService.isManagedByCurrentService(idInfo.getTenantId())) {
entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId());
}
} catch (Exception e) {
log.error("Failed to process device record: {}", idInfo, e);
}
@ -57,7 +61,9 @@ public class DefaultCalculatedFieldInitService implements CalculatedFieldInitSer
for (ProfileEntityIdInfo idInfo : assetIdInfos) {
log.trace("Processing asset record: {}", idInfo);
try {
entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId());
if (partitionService.isManagedByCurrentService(idInfo.getTenantId())) {
entityProfileCache.add(idInfo.getTenantId(), idInfo.getProfileId(), idInfo.getEntityId());
}
} catch (Exception e) {
log.error("Failed to process asset record: {}", idInfo, e);
}

View File

@ -15,20 +15,26 @@
*/
package org.thingsboard.server.service.cf.cache;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -42,13 +48,16 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
private static final Integer UNKNOWN = 0;
private final ConcurrentMap<TenantId, TenantEntityProfileCache> tenantCache = new ConcurrentHashMap<>();
private final PartitionService partitionService;
private final AssetService assetService;
private final DeviceService deviceService;
@Value("${calculated_fields.init_fetch_pack_size:50000}")
@Getter
private int initFetchPackSize;
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
event.getCfPartitions().forEach(tpi -> {
Optional<TenantId> tenantIdOpt = tpi.getTenantId();
tenantIdOpt.ifPresent(tenantId -> tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()));
});
event.getCfPartitions().forEach(tpi -> tpi.getTenantId().ifPresent(this::initCacheForNewTenant));
}
@Override
@ -58,10 +67,14 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
//TODO: make this method atomic;
cache.remove(oldProfileId, entityId);
cache.add(newProfileId, entityId);
tenantCache.compute(tenantId, (id, cache) -> {
if (cache == null) {
cache = new TenantEntityProfileCache();
}
cache.remove(oldProfileId, entityId);
cache.add(newProfileId, entityId);
return cache;
});
}
@Override
@ -91,4 +104,29 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
return tpi.getPartition().orElse(UNKNOWN);
}
private void initCacheForNewTenant(TenantId tenantId) {
PageDataIterable<Device> devices = new PageDataIterable<>(pageLink -> deviceService.findDevicesByTenantId(tenantId, pageLink), initFetchPackSize);
for (Device device : devices) {
log.trace("Processing device record: {}", device);
try {
if (partitionService.isManagedByCurrentService(device.getTenantId())) {
add(device.getTenantId(), device.getDeviceProfileId(), device.getId());
}
} catch (Exception e) {
log.error("Failed to process device record: {}", device, e);
}
}
PageDataIterable<Asset> assets = new PageDataIterable<>(pageLink -> assetService.findAssetsByTenantId(tenantId, pageLink), initFetchPackSize);
for (Asset asset : assets) {
log.trace("Processing asset record: {}", asset);
try {
if (partitionService.isManagedByCurrentService(asset.getTenantId())) {
add(asset.getTenantId(), asset.getAssetProfileId(), asset.getId());
}
} catch (Exception e) {
log.error("Failed to process asset record: {}", asset, e);
}
}
}
}

View File

@ -39,6 +39,8 @@ public interface CalculatedFieldService extends EntityDaoService {
PageData<CalculatedField> findAllCalculatedFields(PageLink pageLink);
List<CalculatedField> findCalculatedFieldsByTenantId(TenantId tenantId);
PageData<CalculatedField> findAllCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink);
void deleteCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
@ -53,6 +55,8 @@ public interface CalculatedFieldService extends EntityDaoService {
List<CalculatedFieldLink> findAllCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId);
List<CalculatedFieldLink> findAllCalculatedFieldLinksByTenantId(TenantId tenantId);
PageData<CalculatedFieldLink> findAllCalculatedFieldLinks(PageLink pageLink);
boolean referencedInAnyCalculatedField(TenantId tenantId, EntityId referencedEntityId);

View File

@ -104,6 +104,13 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
return calculatedFieldDao.findAll(pageLink);
}
@Override
public List<CalculatedField> findCalculatedFieldsByTenantId(TenantId tenantId) {
log.trace("Executing findAllByTenantId, tenantId [{}]", tenantId);
validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
return calculatedFieldDao.findAllByTenantId(tenantId);
}
@Override
public PageData<CalculatedField> findAllCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink) {
log.trace("Executing findAllByEntityId, entityId [{}], pageLink [{}]", entityId, pageLink);
@ -174,6 +181,12 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
return calculatedFieldLinkDao.findCalculatedFieldLinksByEntityId(tenantId, entityId);
}
@Override
public List<CalculatedFieldLink> findAllCalculatedFieldLinksByTenantId(TenantId tenantId) {
log.trace("Executing findAllCalculatedFieldLinksByTenantId, tenantId [{}]", tenantId);
return calculatedFieldLinkDao.findCalculatedFieldLinksByTenantId(tenantId);
}
@Override
public PageData<CalculatedFieldLink> findAllCalculatedFieldLinks(PageLink pageLink) {
log.trace("Executing findAllCalculatedFieldLinks, pageLink [{}]", pageLink);

View File

@ -31,6 +31,8 @@ public interface CalculatedFieldLinkDao extends Dao<CalculatedFieldLink> {
List<CalculatedFieldLink> findCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId);
List<CalculatedFieldLink> findCalculatedFieldLinksByTenantId(TenantId tenantId);
List<CalculatedFieldLink> findAll();
PageData<CalculatedFieldLink> findAll(PageLink pageLink);

View File

@ -27,4 +27,6 @@ public interface CalculatedFieldLinkRepository extends JpaRepository<CalculatedF
List<CalculatedFieldLinkEntity> findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId);
List<CalculatedFieldLinkEntity> findAllByTenantId(UUID tenantId);
}

View File

@ -54,6 +54,11 @@ public class JpaCalculatedFieldLinkDao extends JpaAbstractDao<CalculatedFieldLin
return DaoUtil.convertDataList(calculatedFieldLinkRepository.findAllByTenantIdAndEntityId(tenantId.getId(), entityId.getId()));
}
@Override
public List<CalculatedFieldLink> findCalculatedFieldLinksByTenantId(TenantId tenantId) {
return DaoUtil.convertDataList(calculatedFieldLinkRepository.findAllByTenantId(tenantId.getId()));
}
@Override
public List<CalculatedFieldLink> findAll() {
return DaoUtil.convertDataList(calculatedFieldLinkRepository.findAll());