diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index b5c755ad4b..ad248cc3d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -80,8 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer private long pollInterval; @Value("${queue.calculated_fields.pack_processing_timeout:60000}") private long packProcessingTimeout; - @Value("${queue.calculated_fields.pool_size:8}") - private int poolSize; private final TbRuleEngineQueueFactory queueFactory; private final CalculatedFieldStateService stateService; @@ -148,14 +146,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer } } - private boolean[] partitionsToBooleanIndexArray(Set partitions) { - boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()]; - for (var tpi : partitions) { - tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true); - } - return myPartitions; - } - private void processMsgs(List> msgs, TbQueueConsumer> consumer, QueueConfig config) throws Exception { List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); ConcurrentMap> pendingMap = orderedMsgList.stream().collect( diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index badd779350..3cc28ad702 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1759,7 +1759,6 @@ queue: enabled: "${TB_EDQS_STATS_ENABLED:true}" # Statistics printing interval for EDQS print-interval-ms: "${TB_EDQS_STATS_PRINT_INTERVAL_MS:300000}" - vc: # Default topic name topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}" @@ -1816,9 +1815,7 @@ queue: # For high-priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}" # Interval in milliseconds to poll messages by CF (Rule Engine) microservices - poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}" - # Amount of partitions used by CF microservices - partitions: "${TB_QUEUE_CF_PARTITIONS:10}" + poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:1000}" # Timeout for processing a message pack by CF microservices pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}" # Thread pool size for processing of the incoming messages diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index eaa33e99d9..345b44e764 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -68,8 +68,6 @@ public class HashPartitionService implements PartitionService { private String cfEventTopic; @Value("${queue.calculated_fields.state_topic:tb_cf_state}") private String cfStateTopic; - @Value("${queue.calculated_fields.partitions:10}") - private Integer cfPartitions; @Value("${queue.vc.topic:tb_version_control}") private String vcTopic; @Value("${queue.vc.partitions:10}") @@ -572,11 +570,6 @@ public class HashPartitionService implements PartitionService { return list == null ? 0 : list.size(); } - @Override - public int getTotalCalculatedFieldPartitions() { - return cfPartitions; - } - private Map> getServiceKeyListMap(List services) { final Map> currentMap = new HashMap<>(); services.forEach(serviceInfo -> { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index c7d5bd7acf..7abd68e25f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -79,6 +79,4 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); - int getTotalCalculatedFieldPartitions(); - }