diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index b5fa1e6fbd..26c82a33de 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -106,6 +106,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService; @@ -459,6 +460,11 @@ public class ActorSystemContext { @Getter private DebugModeRateLimitsConfig debugModeRateLimitsConfig; + @Lazy + @Autowired(required = false) + @Getter + private TbQueueCalculatedFieldSettings calculatedFieldSettings; + /** * The following Service will be null if we operate in tb-core mode */ diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index a896bbd74b..ba15bd10f2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.actors.calculatedField; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorRef; @@ -47,6 +45,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.cache.TenantEntityProfileCache; @@ -82,14 +81,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private final TbAssetProfileCache assetProfileCache; private final TbDeviceProfileCache deviceProfileCache; private final TenantEntityProfileCache entityProfileCache; + private final TbQueueCalculatedFieldSettings cfSettings; protected final TenantId tenantId; protected TbActorCtx ctx; - @Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}") - @Getter - private int initFetchPackSize; - CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext); this.cfExecService = systemContext.getCalculatedFieldProcessingService(); @@ -100,6 +96,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware this.assetProfileCache = systemContext.getAssetProfileCache(); this.deviceProfileCache = systemContext.getDeviceProfileCache(); this.entityProfileCache = new TenantEntityProfileCache(); + this.cfSettings = systemContext.getCalculatedFieldSettings(); this.tenantId = tenantId; } @@ -566,7 +563,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } public void initCalculatedFields() { - PageDataIterable cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), initFetchPackSize); + PageDataIterable cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize()); cfs.forEach(cf -> { log.trace("Processing calculated field record: {}", cf); try { @@ -578,14 +575,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware calculatedFields.values().forEach(cf -> { entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf); }); - PageDataIterable cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), initFetchPackSize); + PageDataIterable cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize()); cfls.forEach(link -> { onLinkInitMsg(new CalculatedFieldLinkInitMsg(link.getTenantId(), link)); }); } private void initEntityProfileCache() { - PageDataIterable deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize); + PageDataIterable deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize()); for (ProfileEntityIdInfo idInfo : deviceIdInfos) { log.trace("Processing device record: {}", idInfo); try { @@ -594,7 +591,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.error("Failed to process device record: {}", idInfo, e); } } - PageDataIterable assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize); + PageDataIterable assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize()); for (ProfileEntityIdInfo idInfo : assetIdInfos) { log.trace("Processing asset record: {}", idInfo); try { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCalculatedFieldSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCalculatedFieldSettings.java index c2de8eff4e..9a87e88416 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCalculatedFieldSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueCalculatedFieldSettings.java @@ -31,5 +31,7 @@ public class TbQueueCalculatedFieldSettings { @Value("${queue.calculated_fields.state_topic}") private String stateTopic; + @Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}") + private int initTenantFetchPackSize; }