test fixes

This commit is contained in:
IrynaMatveieva 2025-06-13 18:04:42 +03:00
parent 6f5e375ae1
commit bb570add28
5 changed files with 19 additions and 7 deletions

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.cf; package org.thingsboard.server.service.cf;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
@ -39,6 +40,7 @@ import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto;
public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService {
@Autowired @Autowired
@Lazy
private ActorSystemContext actorSystemContext; private ActorSystemContext actorSystemContext;
protected QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> stateService; protected QueueStateService<TbProtoQueueMsg<ToCalculatedFieldMsg>, TbProtoQueueMsg<CalculatedFieldStateProto>> stateService;

View File

@ -30,7 +30,9 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgHeaders; import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService; import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
@ -109,7 +111,16 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
if (stateMsgProto == null) { if (stateMsgProto == null) {
putStateId(msg.getHeaders(), stateId); putStateId(msg.getHeaders(), stateId);
} }
stateProducer.send(tpi, stateId.toKey(), msg, null); stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
}
@Override
public void onFailure(Throwable t) {
log.error("Failed to send state message: {}", stateId, t);
}
});
callback.onSuccess(); callback.onSuccess();
} }

View File

@ -61,8 +61,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
double value = switch (kvEntry.getDataType()) { double value = switch (kvEntry.getDataType()) {
case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow();
case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); case DOUBLE -> kvEntry.getDoubleValue().orElseThrow();
case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); case BOOLEAN, STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString());
case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString());
}; };
expr.setVariable(entry.getKey(), value); expr.setVariable(entry.getKey(), value);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {

View File

@ -95,8 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Value("${sql.ts.value_no_xss_validation:false}") @Value("${sql.ts.value_no_xss_validation:false}")
private boolean valueNoXssValidation; private boolean valueNoXssValidation;
@Value("${sql.ts.thread_pool_size:12}") @Value("${sql.ts.callback_thread_pool_size:12}")
private int threadPoolSize; private int callbackThreadPoolSize;
public DefaultTelemetrySubscriptionService(AttributesService attrService, public DefaultTelemetrySubscriptionService(AttributesService attrService,
TimeseriesService tsService, TimeseriesService tsService,
@ -117,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@PostConstruct @PostConstruct
public void initExecutor() { public void initExecutor() {
super.initExecutor(); super.initExecutor();
tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "ts-service-ts-callback"); tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, "ts-service-ts-callback");
} }
@Override @Override

View File

@ -346,7 +346,7 @@ sql:
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic
batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution
value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability
thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor callback_thread_pool_size: "${SQL_TS_CALLBACK_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor
ts_latest: ts_latest:
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds