Merge pull request #13189 from irynamatveieva/cf-fix

Cf fixes
This commit is contained in:
Viacheslav Klimov 2025-04-11 17:30:59 +03:00 committed by GitHub
commit 496d3ccea1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 16 additions and 13 deletions

View File

@ -106,6 +106,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
@ -459,6 +460,11 @@ public class ActorSystemContext {
@Getter @Getter
private DebugModeRateLimitsConfig debugModeRateLimitsConfig; private DebugModeRateLimitsConfig debugModeRateLimitsConfig;
@Lazy
@Autowired(required = false)
@Getter
private TbQueueCalculatedFieldSettings calculatedFieldSettings;
/** /**
* The following Service will be null if we operate in tb-core mode * The following Service will be null if we operate in tb-core mode
*/ */

View File

@ -15,9 +15,7 @@
*/ */
package org.thingsboard.server.actors.calculatedField; package org.thingsboard.server.actors.calculatedField;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.actors.TbActorRef;
@ -47,6 +45,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.CalculatedFieldStateService;
import org.thingsboard.server.service.cf.cache.TenantEntityProfileCache; import org.thingsboard.server.service.cf.cache.TenantEntityProfileCache;
@ -82,14 +81,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private final TbAssetProfileCache assetProfileCache; private final TbAssetProfileCache assetProfileCache;
private final TbDeviceProfileCache deviceProfileCache; private final TbDeviceProfileCache deviceProfileCache;
private final TenantEntityProfileCache entityProfileCache; private final TenantEntityProfileCache entityProfileCache;
private final TbQueueCalculatedFieldSettings cfSettings;
protected final TenantId tenantId; protected final TenantId tenantId;
protected TbActorCtx ctx; protected TbActorCtx ctx;
@Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}")
@Getter
private int initFetchPackSize;
CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext); super(systemContext);
this.cfExecService = systemContext.getCalculatedFieldProcessingService(); this.cfExecService = systemContext.getCalculatedFieldProcessingService();
@ -100,6 +96,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
this.assetProfileCache = systemContext.getAssetProfileCache(); this.assetProfileCache = systemContext.getAssetProfileCache();
this.deviceProfileCache = systemContext.getDeviceProfileCache(); this.deviceProfileCache = systemContext.getDeviceProfileCache();
this.entityProfileCache = new TenantEntityProfileCache(); this.entityProfileCache = new TenantEntityProfileCache();
this.cfSettings = systemContext.getCalculatedFieldSettings();
this.tenantId = tenantId; this.tenantId = tenantId;
} }
@ -538,8 +535,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
return switch (entityId.getEntityType()) { return switch (entityId.getEntityType()) {
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId();
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
default -> default -> null;
throw new IllegalArgumentException("'" + entityId.getEntityType() + "' is not profile entity." + entityId);
}; };
} }
@ -566,7 +562,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} }
public void initCalculatedFields() { public void initCalculatedFields() {
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), initFetchPackSize); PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
cfs.forEach(cf -> { cfs.forEach(cf -> {
log.trace("Processing calculated field record: {}", cf); log.trace("Processing calculated field record: {}", cf);
try { try {
@ -578,14 +574,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
calculatedFields.values().forEach(cf -> { calculatedFields.values().forEach(cf -> {
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf); entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf);
}); });
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), initFetchPackSize); PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
cfls.forEach(link -> { cfls.forEach(link -> {
onLinkInitMsg(new CalculatedFieldLinkInitMsg(link.getTenantId(), link)); onLinkInitMsg(new CalculatedFieldLinkInitMsg(link.getTenantId(), link));
}); });
} }
private void initEntityProfileCache() { private void initEntityProfileCache() {
PageDataIterable<ProfileEntityIdInfo> deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize); PageDataIterable<ProfileEntityIdInfo> deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
for (ProfileEntityIdInfo idInfo : deviceIdInfos) { for (ProfileEntityIdInfo idInfo : deviceIdInfos) {
log.trace("Processing device record: {}", idInfo); log.trace("Processing device record: {}", idInfo);
try { try {
@ -594,7 +590,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.error("Failed to process device record: {}", idInfo, e); log.error("Failed to process device record: {}", idInfo, e);
} }
} }
PageDataIterable<ProfileEntityIdInfo> assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize); PageDataIterable<ProfileEntityIdInfo> assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
for (ProfileEntityIdInfo idInfo : assetIdInfos) { for (ProfileEntityIdInfo idInfo : assetIdInfos) {
log.trace("Processing asset record: {}", idInfo); log.trace("Processing asset record: {}", idInfo);
try { try {

View File

@ -283,7 +283,6 @@ public class TenantActor extends RuleChainManagerActor {
log.info("[{}] Failed to init CF Actor.", tenantId, e); log.info("[{}] Failed to init CF Actor.", tenantId, e);
} }
} }
cfActor.tellWithHighPriority(msg);
if (!ruleChainsInitialized) { if (!ruleChainsInitialized) {
log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId); log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId);
initRuleChains(); initRuleChains();

View File

@ -31,5 +31,7 @@ public class TbQueueCalculatedFieldSettings {
@Value("${queue.calculated_fields.state_topic}") @Value("${queue.calculated_fields.state_topic}")
private String stateTopic; private String stateTopic;
@Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}")
private int initTenantFetchPackSize;
} }