CF parameters cleanup

This commit is contained in:
Andrii Shvaika 2025-03-11 16:55:58 +02:00
parent 24c52c47cd
commit 55f9f66312
4 changed files with 1 additions and 23 deletions

View File

@ -80,8 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
private long pollInterval;
@Value("${queue.calculated_fields.pack_processing_timeout:60000}")
private long packProcessingTimeout;
@Value("${queue.calculated_fields.pool_size:8}")
private int poolSize;
private final TbRuleEngineQueueFactory queueFactory;
private final CalculatedFieldStateService stateService;
@ -148,14 +146,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
}
}
private boolean[] partitionsToBooleanIndexArray(Set<TopicPartitionInfo> partitions) {
boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()];
for (var tpi : partitions) {
tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true);
}
return myPartitions;
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception {
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(

View File

@ -1759,7 +1759,6 @@ queue:
enabled: "${TB_EDQS_STATS_ENABLED:true}"
# Statistics printing interval for EDQS
print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}"
vc:
# Default topic name
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
@ -1816,9 +1815,7 @@ queue:
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}"
# Interval in milliseconds to poll messages by CF (Rule Engine) microservices
poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}"
# Amount of partitions used by CF microservices
partitions: "${TB_QUEUE_CF_PARTITIONS:10}"
poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:1000}"
# Timeout for processing a message pack by CF microservices
pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}"
# Thread pool size for processing of the incoming messages

View File

@ -68,8 +68,6 @@ public class HashPartitionService implements PartitionService {
private String cfEventTopic;
@Value("${queue.calculated_fields.state_topic:tb_cf_state}")
private String cfStateTopic;
@Value("${queue.calculated_fields.partitions:10}")
private Integer cfPartitions;
@Value("${queue.vc.topic:tb_version_control}")
private String vcTopic;
@Value("${queue.vc.partitions:10}")
@ -572,11 +570,6 @@ public class HashPartitionService implements PartitionService {
return list == null ? 0 : list.size();
}
@Override
public int getTotalCalculatedFieldPartitions() {
return cfPartitions;
}
private Map<QueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
services.forEach(serviceInfo -> {

View File

@ -79,6 +79,4 @@ public interface PartitionService {
int resolvePartitionIndex(UUID entityId, int partitions);
int getTotalCalculatedFieldPartitions();
}