changed calculation result
This commit is contained in:
parent
7911384f37
commit
7642a88460
@ -245,7 +245,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
|
|||||||
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
|
state.checkStateSize(ctxId, ctx.getMaxStateSizeInKBytes());
|
||||||
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
|
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
|
||||||
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
|
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
|
||||||
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResultMap()), null);
|
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
callback.onSuccess(); // State was updated but no calculation performed;
|
callback.onSuccess(); // State was updated but no calculation performed;
|
||||||
|
|||||||
@ -84,14 +84,18 @@ public class CalculatedFieldController extends BaseController {
|
|||||||
private static final String TEST_SCRIPT_EXPRESSION = "Execute the Script expression and return the result. The format of request: \n\n"
|
private static final String TEST_SCRIPT_EXPRESSION = "Execute the Script expression and return the result. The format of request: \n\n"
|
||||||
+ MARKDOWN_CODE_BLOCK_START
|
+ MARKDOWN_CODE_BLOCK_START
|
||||||
+ "{\n" +
|
+ "{\n" +
|
||||||
" \"expression\": \"var temp = 0; foreach(element: temperature.entrySet()) { temp += element.getValue(); } var avgTemperature = temp / temperature.size(); var adjustedTemperature = avgTemperature + 0.1 * humidity; return { \\\"adjustedTemperature\\\": adjustedTemperature };\",\n" +
|
" \"expression\": \"var temp = 0; foreach(element: temperature.values) {temp += element.value;} var avgTemperature = temp / temperature.values.size(); var adjustedTemperature = avgTemperature + 0.1 * humidity.value; return {\"adjustedTemperature\": adjustedTemperature};\",\n" +
|
||||||
" \"arguments\": {\n" +
|
" \"arguments\": {\n" +
|
||||||
" \"temperature\": {\n" +
|
" \"temperature\": {\n" +
|
||||||
" \"14327856345\": 22.4,\n" +
|
" \"values\": [\n" +
|
||||||
" \"14327857298\": 21.9,\n" +
|
" { \"ts\": 1739775639851, \"value\": 23 },\n" +
|
||||||
" \"14327857510\": 22.0\n" +
|
" { \"ts\": 1739775664561, \"value\": 43 },\n" +
|
||||||
|
" { \"ts\": 1739775713079, \"value\": 15 },\n" +
|
||||||
|
" { \"ts\": 1739775999522, \"value\": 34 },\n" +
|
||||||
|
" { \"ts\": 1739776228452, \"value\": 22 }\n" +
|
||||||
|
" ]\n" +
|
||||||
" },\n" +
|
" },\n" +
|
||||||
" \"humidity\": 42\n" +
|
" \"humidity\": { \"ts\": 1739776478057, \"value\": 23 }\n" +
|
||||||
" }\n" +
|
" }\n" +
|
||||||
"}"
|
"}"
|
||||||
+ MARKDOWN_CODE_BLOCK_END
|
+ MARKDOWN_CODE_BLOCK_END
|
||||||
|
|||||||
@ -15,22 +15,16 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.cf;
|
package org.thingsboard.server.service.cf;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.OutputType;
|
import org.thingsboard.server.common.data.cf.configuration.OutputType;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public final class CalculatedFieldResult {
|
public final class CalculatedFieldResult {
|
||||||
|
|
||||||
private final OutputType type;
|
private final OutputType type;
|
||||||
private final AttributeScope scope;
|
private final AttributeScope scope;
|
||||||
private final Map<String, Object> resultMap;
|
private final JsonNode result;
|
||||||
|
|
||||||
public CalculatedFieldResult(OutputType type, AttributeScope scope, Map<String, Object> resultMap) {
|
|
||||||
this.type = type;
|
|
||||||
this.scope = scope;
|
|
||||||
this.resultMap = resultMap == null ? Map.of() : Map.copyOf(resultMap);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.cf;
|
package org.thingsboard.server.service.cf;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
@ -155,8 +153,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
|
|||||||
OutputType type = calculatedFieldResult.getType();
|
OutputType type = calculatedFieldResult.getType();
|
||||||
TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
|
TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
|
||||||
TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
|
TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
|
||||||
ObjectNode payload = createJsonPayload(calculatedFieldResult);
|
TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(calculatedFieldResult.getResult())).build();
|
||||||
TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build();
|
|
||||||
clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() {
|
clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||||
@ -283,13 +280,6 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
|
|||||||
return new StringDataEntry(key, defaultValue);
|
return new StringDataEntry(key, defaultValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) {
|
|
||||||
ObjectNode payload = JacksonUtil.newObjectNode();
|
|
||||||
Map<String, Object> resultMap = calculatedFieldResult.getResultMap();
|
|
||||||
resultMap.forEach((k, v) -> payload.set(k, JacksonUtil.convertValue(v, JsonNode.class)));
|
|
||||||
return payload;
|
|
||||||
}
|
|
||||||
|
|
||||||
private CalculatedFieldState createStateByType(CalculatedFieldCtx ctx) {
|
private CalculatedFieldState createStateByType(CalculatedFieldCtx ctx) {
|
||||||
return switch (ctx.getCfType()) {
|
return switch (ctx.getCfType()) {
|
||||||
case SIMPLE -> new SimpleCalculatedFieldState(ctx.getArgNames());
|
case SIMPLE -> new SimpleCalculatedFieldState(ctx.getArgNames());
|
||||||
|
|||||||
@ -18,18 +18,12 @@ package org.thingsboard.server.service.cf.ctx.state;
|
|||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public interface CalculatedFieldScriptEngine {
|
public interface CalculatedFieldScriptEngine {
|
||||||
|
|
||||||
ListenableFuture<Object> executeScriptAsync(Object[] args);
|
ListenableFuture<Object> executeScriptAsync(Object[] args);
|
||||||
|
|
||||||
ListenableFuture<Map<String, Object>> executeToMapAsync(Object[] args);
|
|
||||||
|
|
||||||
ListenableFuture<JsonNode> executeJsonAsync(Object[] args);
|
ListenableFuture<JsonNode> executeJsonAsync(Object[] args);
|
||||||
|
|
||||||
ListenableFuture<Map<String, Object>> executeToMapTransform(Object result);
|
|
||||||
|
|
||||||
void destroy();
|
void destroy();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import io.jsonwebtoken.lang.Collections;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.script.api.ScriptType;
|
import org.thingsboard.script.api.ScriptType;
|
||||||
@ -27,7 +26,6 @@ import org.thingsboard.script.api.tbel.TbelInvokeService;
|
|||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
|
||||||
import javax.script.ScriptException;
|
import javax.script.ScriptException;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
@ -72,24 +70,11 @@ public class CalculatedFieldTbelScriptEngine implements CalculatedFieldScriptEng
|
|||||||
}, MoreExecutors.directExecutor());
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Map<String, Object>> executeToMapAsync(Object[] args) {
|
|
||||||
return Futures.transformAsync(executeScriptAsync(args), this::executeToMapTransform, MoreExecutors.directExecutor());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<JsonNode> executeJsonAsync(Object[] args) {
|
public ListenableFuture<JsonNode> executeJsonAsync(Object[] args) {
|
||||||
return Futures.transform(executeScriptAsync(args), JacksonUtil::valueToTree, MoreExecutors.directExecutor());
|
return Futures.transform(executeScriptAsync(args), JacksonUtil::valueToTree, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<Map<String, Object>> executeToMapTransform(Object result) {
|
|
||||||
if (result instanceof Map) {
|
|
||||||
return Futures.immediateFuture((Map<String, Object>) result);
|
|
||||||
}
|
|
||||||
throw new IllegalArgumentException("Wrong result type: [" + result.getClass().getName() + "]");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
tbelInvokeService.release(this.scriptId);
|
tbelInvokeService.release(this.scriptId);
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.cf.ctx.state;
|
package org.thingsboard.server.service.cf.ctx.state;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
@ -27,7 +28,6 @@ import org.thingsboard.server.common.data.cf.configuration.Output;
|
|||||||
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
import org.thingsboard.server.service.cf.CalculatedFieldResult;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -53,7 +53,7 @@ public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
|
|||||||
.map(this::toTbelArgument)
|
.map(this::toTbelArgument)
|
||||||
.toArray();
|
.toArray();
|
||||||
|
|
||||||
ListenableFuture<Map<String, Object>> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(args);
|
ListenableFuture<JsonNode> resultFuture = ctx.getCalculatedFieldScriptEngine().executeJsonAsync(args);
|
||||||
Output output = ctx.getOutput();
|
Output output = ctx.getOutput();
|
||||||
return Futures.transform(resultFuture,
|
return Futures.transform(resultFuture,
|
||||||
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
|
result -> new CalculatedFieldResult(output.getType(), output.getScope(), result),
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||||
import org.thingsboard.server.common.data.cf.configuration.Output;
|
import org.thingsboard.server.common.data.cf.configuration.Output;
|
||||||
import org.thingsboard.server.common.data.kv.BasicKvEntry;
|
import org.thingsboard.server.common.data.kv.BasicKvEntry;
|
||||||
@ -63,7 +64,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
|
|||||||
double expressionResult = expr.evaluate();
|
double expressionResult = expr.evaluate();
|
||||||
|
|
||||||
Output output = ctx.getOutput();
|
Output output = ctx.getOutput();
|
||||||
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), Map.of(output.getName(), expressionResult)));
|
return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), JacksonUtil.valueToTree(Map.of(output.getName(), expressionResult))));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.test.context.SpringBootTest;
|
import org.springframework.boot.test.context.SpringBootTest;
|
||||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.script.api.tbel.DefaultTbelInvokeService;
|
import org.thingsboard.script.api.tbel.DefaultTbelInvokeService;
|
||||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
@ -127,7 +128,7 @@ public class ScriptCalculatedFieldStateTest {
|
|||||||
Output output = getCalculatedFieldConfig().getOutput();
|
Output output = getCalculatedFieldConfig().getOutput();
|
||||||
assertThat(result.getType()).isEqualTo(output.getType());
|
assertThat(result.getType()).isEqualTo(output.getType());
|
||||||
assertThat(result.getScope()).isEqualTo(output.getScope());
|
assertThat(result.getScope()).isEqualTo(output.getScope());
|
||||||
assertThat(result.getResultMap()).isEqualTo(Map.of("maxDeviceTemperature", 17.0, "assetHumidity", 43.0));
|
assertThat(result.getResult()).isEqualTo(JacksonUtil.valueToTree(Map.of("maxDeviceTemperature", 17.0, "assetHumidity", 43.0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||||
@ -137,7 +138,7 @@ public class SimpleCalculatedFieldStateTest {
|
|||||||
Output output = getCalculatedFieldConfig().getOutput();
|
Output output = getCalculatedFieldConfig().getOutput();
|
||||||
assertThat(result.getType()).isEqualTo(output.getType());
|
assertThat(result.getType()).isEqualTo(output.getType());
|
||||||
assertThat(result.getScope()).isEqualTo(output.getScope());
|
assertThat(result.getScope()).isEqualTo(output.getScope());
|
||||||
assertThat(result.getResultMap()).isEqualTo(Map.of("output", 49.0));
|
assertThat(result.getResult()).isEqualTo(JacksonUtil.valueToTree(Map.of("output", 49.0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user