diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 219a261183..ed35d96cb7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -64,7 +64,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { private final ConcurrentMap> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>(); private final ConcurrentMap calculatedFieldsCtx = new ConcurrentHashMap<>(); - @Value("${calculatedField.initFetchPackSize:50000}") + @Value("${queue.calculated_fields.init_fetch_pack_size:50000}") @Getter private int initFetchPackSize; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 41bf76ff0e..f8c398b016 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -175,7 +175,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBa packSubmitFuture.cancel(true); log.info("Timeout to process message: {}", pendingMsgHolder.getMsg()); } - ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); + ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } consumer.commit(); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 14f0437402..76f148c099 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1848,9 +1848,9 @@ queue: pool_size: "${TB_QUEUE_CF_POOL_SIZE:8}" # RocksDB path for storing CF states rocks_db_path: "${TB_QUEUE_CF_ROCKS_DB_PATH:${user.home}/.rocksdb/cf_states}" - # The fetch size specifies how many rows will be returned + # The fetch size specifies how many rows will be fetched from the database per request for initial fetching init_fetch_pack_size: "${TB_QUEUE_CF_FETCH_PACK_SIZE:50000}" - # The fetch size specifies how many rows will be returned + # The fetch size specifies how many rows will be fetched from the database per request for per-tenant fetching init_tenant_fetch_pack_size: "${TB_QUEUE_CF_TENANT_FETCH_PACK_SIZE:1000}" transport: # For high-priority notifications that require minimum latency and processing time diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java index 4a5ecef970..26832f6176 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -52,8 +51,6 @@ public class DefaultTbCoreConsumerServiceTest { private TbCoreConsumerStats statsMock; @Mock private RuleEngineCallService ruleEngineCallServiceMock; - @Mock - private Logger logMock; @Mock private TbCallback tbCallbackMock; @@ -72,7 +69,6 @@ public class DefaultTbCoreConsumerServiceTest { executor = MoreExecutors.newDirectExecutorService(); ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stateService", stateServiceMock); ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "deviceActivityEventsExecutor", executor); - ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "log", logMock); } @AfterEach