diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index bf206bc5dc..0c2d1a24a9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -200,7 +200,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS for (int i = 0; i < entries.size(); i++) { TsKvEntry tsKvEntry = entries.get(i); - if (result != null) { + if (versions != null && !versions.isEmpty() && versions.get(i) != null) { tsKvEntry.setVersion(versions.get(i)); } telemetryMsg.addTsData(toTsKvProto(tsKvEntry)); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index d0eba5031c..03399348cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -58,7 +58,18 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { for (Map.Entry entry : this.arguments.entrySet()) { try { BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue(); - expr.setVariable(entry.getKey(), Double.parseDouble(kvEntry.getValueAsString())); + try { + double value = switch (kvEntry.getDataType()) { + case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); + case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); + case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); + case STRING -> Double.parseDouble(kvEntry.getValueAsString()); + case JSON -> Double.parseDouble(kvEntry.getValueAsString()); + }; + expr.setVariable(entry.getKey(), value); + } catch (Exception e) { + throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.", e); + } } catch (NumberFormatException e) { throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number."); } @@ -85,7 +96,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { private JsonNode createResultJson(boolean preserveMsgTs, String outputName, Object result) { ObjectNode valuesNode = JacksonUtil.newObjectNode(); - valuesNode.set(outputName, JacksonUtil.valueToTree(result)); + if (result instanceof Double doubleValue) { + valuesNode.put(outputName, doubleValue); + } else if (result instanceof Integer integerValue) { + valuesNode.put(outputName, integerValue); + } else { + valuesNode.set(outputName, JacksonUtil.valueToTree(result)); + } long lastTimestamp = getLastUpdateTimestamp(); if (preserveMsgTs && lastTimestamp != -1) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 30f2885e78..0f4f615af3 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.DonAsynchron; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.DeviceStateManager; @@ -69,7 +69,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; import static java.util.Comparator.comparing; @@ -96,6 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Value("${sql.ts.value_no_xss_validation:false}") private boolean valueNoXssValidation; + @Value("${sql.ts.thread_pool_size:12}") + private int threadPoolSize; public DefaultTelemetrySubscriptionService(AttributesService attrService, TimeseriesService tsService, @@ -116,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @PostConstruct public void initExecutor() { super.initExecutor(); - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback")); + tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "ts-service-ts-callback"); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 6d27b9bd7e..e170695b90 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -346,6 +346,7 @@ sql: stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability + thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}"# Thread pool size to execute dynamic queries ts_latest: batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index c3095836bd..19c1956463 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -57,6 +57,7 @@ public class AttributesSaveRequest implements CalculatedFieldSystemAwareRequest public static final Strategy PROCESS_ALL = new Strategy(true, true, true); public static final Strategy WS_ONLY = new Strategy(false, true, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false); + public static final Strategy CF_ONLY = new Strategy(false, false, true); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index c402a0c984..3bacf8d8c3 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -56,6 +56,7 @@ public class TimeseriesSaveRequest implements CalculatedFieldSystemAwareRequest public static final Strategy WS_ONLY = new Strategy(false, false, true, false); public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false, false); + public static final Strategy CF_ONLY = new Strategy(false, false, false, true); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index b2cce52c87..4e319500d1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -92,13 +92,14 @@ public class TbCalculatedFieldsNode implements TbNode { .customerId(msg.getCustomerId()) .entityId(msg.getOriginator()) .entries(tsKvEntryList) + .strategy(TimeseriesSaveRequest.Strategy.CF_ONLY) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); + ctx.getTelemetryService().saveTimeseries(timeseriesSaveRequest); } private void processPostAttributesRequest(TbContext ctx, TbMsg msg) { @@ -114,12 +115,13 @@ public class TbCalculatedFieldsNode implements TbNode { .entityId(msg.getOriginator()) .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) .entries(newAttributes) + .strategy(AttributesSaveRequest.Strategy.CF_ONLY) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); + ctx.getTelemetryService().saveAttributes(attributesSaveRequest); } }