trigger processing using telemetry sub service in CF node
This commit is contained in:
parent
edd9a6a552
commit
0ade84c842
@ -200,7 +200,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
|
|||||||
|
|
||||||
for (int i = 0; i < entries.size(); i++) {
|
for (int i = 0; i < entries.size(); i++) {
|
||||||
TsKvEntry tsKvEntry = entries.get(i);
|
TsKvEntry tsKvEntry = entries.get(i);
|
||||||
if (result != null) {
|
if (versions != null && !versions.isEmpty() && versions.get(i) != null) {
|
||||||
tsKvEntry.setVersion(versions.get(i));
|
tsKvEntry.setVersion(versions.get(i));
|
||||||
}
|
}
|
||||||
telemetryMsg.addTsData(toTsKvProto(tsKvEntry));
|
telemetryMsg.addTsData(toTsKvProto(tsKvEntry));
|
||||||
|
|||||||
@ -58,7 +58,18 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
|||||||
for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) {
|
for (Map.Entry<String, ArgumentEntry> entry : this.arguments.entrySet()) {
|
||||||
try {
|
try {
|
||||||
BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue();
|
BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue();
|
||||||
expr.setVariable(entry.getKey(), Double.parseDouble(kvEntry.getValueAsString()));
|
try {
|
||||||
|
double value = switch (kvEntry.getDataType()) {
|
||||||
|
case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow();
|
||||||
|
case DOUBLE -> kvEntry.getDoubleValue().orElseThrow();
|
||||||
|
case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow();
|
||||||
|
case STRING -> Double.parseDouble(kvEntry.getValueAsString());
|
||||||
|
case JSON -> Double.parseDouble(kvEntry.getValueAsString());
|
||||||
|
};
|
||||||
|
expr.setVariable(entry.getKey(), value);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.", e);
|
||||||
|
}
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.");
|
throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number.");
|
||||||
}
|
}
|
||||||
@ -85,7 +96,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
|||||||
|
|
||||||
private JsonNode createResultJson(boolean preserveMsgTs, String outputName, Object result) {
|
private JsonNode createResultJson(boolean preserveMsgTs, String outputName, Object result) {
|
||||||
ObjectNode valuesNode = JacksonUtil.newObjectNode();
|
ObjectNode valuesNode = JacksonUtil.newObjectNode();
|
||||||
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
|
if (result instanceof Double doubleValue) {
|
||||||
|
valuesNode.put(outputName, doubleValue);
|
||||||
|
} else if (result instanceof Integer integerValue) {
|
||||||
|
valuesNode.put(outputName, integerValue);
|
||||||
|
} else {
|
||||||
|
valuesNode.set(outputName, JacksonUtil.valueToTree(result));
|
||||||
|
}
|
||||||
|
|
||||||
long lastTimestamp = getLastUpdateTimestamp();
|
long lastTimestamp = getLastUpdateTimestamp();
|
||||||
if (preserveMsgTs && lastTimestamp != -1) {
|
if (preserveMsgTs && lastTimestamp != -1) {
|
||||||
|
|||||||
@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Value;
|
|||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.common.util.DonAsynchron;
|
import org.thingsboard.common.util.DonAsynchron;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
|
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
|
||||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||||
import org.thingsboard.rule.engine.api.DeviceStateManager;
|
import org.thingsboard.rule.engine.api.DeviceStateManager;
|
||||||
@ -69,7 +69,6 @@ import java.util.Map;
|
|||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static java.util.Comparator.comparing;
|
import static java.util.Comparator.comparing;
|
||||||
@ -96,6 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
|
|
||||||
@Value("${sql.ts.value_no_xss_validation:false}")
|
@Value("${sql.ts.value_no_xss_validation:false}")
|
||||||
private boolean valueNoXssValidation;
|
private boolean valueNoXssValidation;
|
||||||
|
@Value("${sql.ts.thread_pool_size:12}")
|
||||||
|
private int threadPoolSize;
|
||||||
|
|
||||||
public DefaultTelemetrySubscriptionService(AttributesService attrService,
|
public DefaultTelemetrySubscriptionService(AttributesService attrService,
|
||||||
TimeseriesService tsService,
|
TimeseriesService tsService,
|
||||||
@ -116,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void initExecutor() {
|
public void initExecutor() {
|
||||||
super.initExecutor();
|
super.initExecutor();
|
||||||
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback"));
|
tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "ts-service-ts-callback");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -346,6 +346,7 @@ sql:
|
|||||||
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic
|
stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic
|
||||||
batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution
|
batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution
|
||||||
value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability
|
value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability
|
||||||
|
thread_pool_size: "${SQL_TS_THREAD_POOL_SIZE:12}"# Thread pool size to execute dynamic queries
|
||||||
ts_latest:
|
ts_latest:
|
||||||
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates
|
batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates
|
||||||
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds
|
batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds
|
||||||
|
|||||||
@ -57,6 +57,7 @@ public class AttributesSaveRequest implements CalculatedFieldSystemAwareRequest
|
|||||||
public static final Strategy PROCESS_ALL = new Strategy(true, true, true);
|
public static final Strategy PROCESS_ALL = new Strategy(true, true, true);
|
||||||
public static final Strategy WS_ONLY = new Strategy(false, true, false);
|
public static final Strategy WS_ONLY = new Strategy(false, true, false);
|
||||||
public static final Strategy SKIP_ALL = new Strategy(false, false, false);
|
public static final Strategy SKIP_ALL = new Strategy(false, false, false);
|
||||||
|
public static final Strategy CF_ONLY = new Strategy(false, false, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -56,6 +56,7 @@ public class TimeseriesSaveRequest implements CalculatedFieldSystemAwareRequest
|
|||||||
public static final Strategy WS_ONLY = new Strategy(false, false, true, false);
|
public static final Strategy WS_ONLY = new Strategy(false, false, true, false);
|
||||||
public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false);
|
public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false);
|
||||||
public static final Strategy SKIP_ALL = new Strategy(false, false, false, false);
|
public static final Strategy SKIP_ALL = new Strategy(false, false, false, false);
|
||||||
|
public static final Strategy CF_ONLY = new Strategy(false, false, false, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -92,13 +92,14 @@ public class TbCalculatedFieldsNode implements TbNode {
|
|||||||
.customerId(msg.getCustomerId())
|
.customerId(msg.getCustomerId())
|
||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.entries(tsKvEntryList)
|
.entries(tsKvEntryList)
|
||||||
|
.strategy(TimeseriesSaveRequest.Strategy.CF_ONLY)
|
||||||
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
||||||
.tbMsgId(msg.getId())
|
.tbMsgId(msg.getId())
|
||||||
.tbMsgType(msg.getInternalType())
|
.tbMsgType(msg.getInternalType())
|
||||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback());
|
ctx.getTelemetryService().saveTimeseries(timeseriesSaveRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processPostAttributesRequest(TbContext ctx, TbMsg msg) {
|
private void processPostAttributesRequest(TbContext ctx, TbMsg msg) {
|
||||||
@ -114,12 +115,13 @@ public class TbCalculatedFieldsNode implements TbNode {
|
|||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
|
.scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE)))
|
||||||
.entries(newAttributes)
|
.entries(newAttributes)
|
||||||
|
.strategy(AttributesSaveRequest.Strategy.CF_ONLY)
|
||||||
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
.previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds())
|
||||||
.tbMsgId(msg.getId())
|
.tbMsgId(msg.getId())
|
||||||
.tbMsgType(msg.getInternalType())
|
.tbMsgType(msg.getInternalType())
|
||||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||||
.build();
|
.build();
|
||||||
ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback());
|
ctx.getTelemetryService().saveAttributes(attributesSaveRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user