moved initFetchPackSize property to settings
This commit is contained in:
parent
122dea9feb
commit
12739b60ce
@ -106,6 +106,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
|||||||
import org.thingsboard.server.queue.discovery.DiscoveryService;
|
import org.thingsboard.server.queue.discovery.DiscoveryService;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
|
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
|
||||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||||
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
|
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
|
||||||
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
|
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
|
||||||
@ -459,6 +460,11 @@ public class ActorSystemContext {
|
|||||||
@Getter
|
@Getter
|
||||||
private DebugModeRateLimitsConfig debugModeRateLimitsConfig;
|
private DebugModeRateLimitsConfig debugModeRateLimitsConfig;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Autowired(required = false)
|
||||||
|
@Getter
|
||||||
|
private TbQueueCalculatedFieldSettings calculatedFieldSettings;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The following Service will be null if we operate in tb-core mode
|
* The following Service will be null if we operate in tb-core mode
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -15,9 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.actors.calculatedField;
|
package org.thingsboard.server.actors.calculatedField;
|
||||||
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.actors.TbActorCtx;
|
import org.thingsboard.server.actors.TbActorCtx;
|
||||||
import org.thingsboard.server.actors.TbActorRef;
|
import org.thingsboard.server.actors.TbActorRef;
|
||||||
@ -47,6 +45,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
|
|||||||
import org.thingsboard.server.dao.asset.AssetService;
|
import org.thingsboard.server.dao.asset.AssetService;
|
||||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||||
import org.thingsboard.server.dao.device.DeviceService;
|
import org.thingsboard.server.dao.device.DeviceService;
|
||||||
|
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
|
||||||
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
|
import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
|
||||||
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
|
import org.thingsboard.server.service.cf.CalculatedFieldStateService;
|
||||||
import org.thingsboard.server.service.cf.cache.TenantEntityProfileCache;
|
import org.thingsboard.server.service.cf.cache.TenantEntityProfileCache;
|
||||||
@ -82,14 +81,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
private final TbAssetProfileCache assetProfileCache;
|
private final TbAssetProfileCache assetProfileCache;
|
||||||
private final TbDeviceProfileCache deviceProfileCache;
|
private final TbDeviceProfileCache deviceProfileCache;
|
||||||
private final TenantEntityProfileCache entityProfileCache;
|
private final TenantEntityProfileCache entityProfileCache;
|
||||||
|
private final TbQueueCalculatedFieldSettings cfSettings;
|
||||||
protected final TenantId tenantId;
|
protected final TenantId tenantId;
|
||||||
|
|
||||||
protected TbActorCtx ctx;
|
protected TbActorCtx ctx;
|
||||||
|
|
||||||
@Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}")
|
|
||||||
@Getter
|
|
||||||
private int initFetchPackSize;
|
|
||||||
|
|
||||||
CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) {
|
CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) {
|
||||||
super(systemContext);
|
super(systemContext);
|
||||||
this.cfExecService = systemContext.getCalculatedFieldProcessingService();
|
this.cfExecService = systemContext.getCalculatedFieldProcessingService();
|
||||||
@ -100,6 +96,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
this.assetProfileCache = systemContext.getAssetProfileCache();
|
this.assetProfileCache = systemContext.getAssetProfileCache();
|
||||||
this.deviceProfileCache = systemContext.getDeviceProfileCache();
|
this.deviceProfileCache = systemContext.getDeviceProfileCache();
|
||||||
this.entityProfileCache = new TenantEntityProfileCache();
|
this.entityProfileCache = new TenantEntityProfileCache();
|
||||||
|
this.cfSettings = systemContext.getCalculatedFieldSettings();
|
||||||
this.tenantId = tenantId;
|
this.tenantId = tenantId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -566,7 +563,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void initCalculatedFields() {
|
public void initCalculatedFields() {
|
||||||
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), initFetchPackSize);
|
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(pageLink -> cfDaoService.findCalculatedFieldsByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
|
||||||
cfs.forEach(cf -> {
|
cfs.forEach(cf -> {
|
||||||
log.trace("Processing calculated field record: {}", cf);
|
log.trace("Processing calculated field record: {}", cf);
|
||||||
try {
|
try {
|
||||||
@ -578,14 +575,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
calculatedFields.values().forEach(cf -> {
|
calculatedFields.values().forEach(cf -> {
|
||||||
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf);
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf);
|
||||||
});
|
});
|
||||||
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), initFetchPackSize);
|
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(pageLink -> cfDaoService.findAllCalculatedFieldLinksByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
|
||||||
cfls.forEach(link -> {
|
cfls.forEach(link -> {
|
||||||
onLinkInitMsg(new CalculatedFieldLinkInitMsg(link.getTenantId(), link));
|
onLinkInitMsg(new CalculatedFieldLinkInitMsg(link.getTenantId(), link));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initEntityProfileCache() {
|
private void initEntityProfileCache() {
|
||||||
PageDataIterable<ProfileEntityIdInfo> deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize);
|
PageDataIterable<ProfileEntityIdInfo> deviceIdInfos = new PageDataIterable<>(pageLink -> deviceService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
|
||||||
for (ProfileEntityIdInfo idInfo : deviceIdInfos) {
|
for (ProfileEntityIdInfo idInfo : deviceIdInfos) {
|
||||||
log.trace("Processing device record: {}", idInfo);
|
log.trace("Processing device record: {}", idInfo);
|
||||||
try {
|
try {
|
||||||
@ -594,7 +591,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
|
|||||||
log.error("Failed to process device record: {}", idInfo, e);
|
log.error("Failed to process device record: {}", idInfo, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PageDataIterable<ProfileEntityIdInfo> assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), initFetchPackSize);
|
PageDataIterable<ProfileEntityIdInfo> assetIdInfos = new PageDataIterable<>(pageLink -> assetService.findProfileEntityIdInfosByTenantId(tenantId, pageLink), cfSettings.getInitTenantFetchPackSize());
|
||||||
for (ProfileEntityIdInfo idInfo : assetIdInfos) {
|
for (ProfileEntityIdInfo idInfo : assetIdInfos) {
|
||||||
log.trace("Processing asset record: {}", idInfo);
|
log.trace("Processing asset record: {}", idInfo);
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -31,5 +31,7 @@ public class TbQueueCalculatedFieldSettings {
|
|||||||
@Value("${queue.calculated_fields.state_topic}")
|
@Value("${queue.calculated_fields.state_topic}")
|
||||||
private String stateTopic;
|
private String stateTopic;
|
||||||
|
|
||||||
|
@Value("${queue.calculated_fields.init_tenant_fetch_pack_size:1000}")
|
||||||
|
private int initTenantFetchPackSize;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user