From bb570add28d7e93d0834d67b4cdf7d553a9ab9e7 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 13 Jun 2025 18:04:42 +0300 Subject: [PATCH] test fixes --- .../cf/AbstractCalculatedFieldStateService.java | 2 ++ .../ctx/state/KafkaCalculatedFieldStateService.java | 13 ++++++++++++- .../cf/ctx/state/SimpleCalculatedFieldState.java | 3 +-- .../DefaultTelemetrySubscriptionService.java | 6 +++--- application/src/main/resources/thingsboard.yml | 2 +- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index f1cb25c6fa..70b41f069e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.cf; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -39,6 +40,7 @@ import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { @Autowired + @Lazy private ActorSystemContext actorSystemContext; protected QueueStateService, TbProtoQueueMsg> stateService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index 6641a06b1d..2b52892744 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -30,7 +30,9 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgHeaders; +import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.state.KafkaQueueStateService; @@ -109,7 +111,16 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta if (stateMsgProto == null) { putStateId(msg.getHeaders(), stateId); } - stateProducer.send(tpi, stateId.toKey(), msg, null); + stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + } + + @Override + public void onFailure(Throwable t) { + log.error("Failed to send state message: {}", stateId, t); + } + }); callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index 577ff80219..026461bd48 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -61,8 +61,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { double value = switch (kvEntry.getDataType()) { case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); - case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); - case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); + case BOOLEAN, STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); }; expr.setVariable(entry.getKey(), value); } catch (NumberFormatException e) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index a14e052659..69b41addf9 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -95,8 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Value("${sql.ts.value_no_xss_validation:false}") private boolean valueNoXssValidation; - @Value("${sql.ts.thread_pool_size:12}") - private int threadPoolSize; + @Value("${sql.ts.callback_thread_pool_size:12}") + private int callbackThreadPoolSize; public DefaultTelemetrySubscriptionService(AttributesService attrService, TimeseriesService tsService, @@ -117,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @PostConstruct public void initExecutor() { super.initExecutor(); - tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "ts-service-ts-callback"); + tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, "ts-service-ts-callback"); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 958ad2f84c..c3d28f9fce 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -346,7 +346,7 @@ sql: stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability - thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor + callback_thread_pool_size: "${SQL_TS_CALLBACK_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor ts_latest: batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds