added implementation for script type

This commit is contained in:
IrynaMatveieva 2024-11-21 15:45:39 +02:00
parent 40299cab1a
commit c75603f57c
10 changed files with 248 additions and 63 deletions

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -26,8 +27,6 @@ public interface CalculatedFieldExecutionService {
void onCalculatedFieldMsg(TransportProtos.CalculatedFieldMsgProto proto, TbCallback callback);
void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, String> updatedTelemetry);
// void onEntityProfileUpdate(TransportProtos.CalculatedFieldEntityProfileUpdateMsgProto proto, TbCallback callback);
void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry);
}

View File

@ -25,7 +25,7 @@ public class CalculatedFieldResult {
private String type;
private AttributeScope scope;
private Map<String, String> resultMap;
private Map<String, Object> resultMap;
public CalculatedFieldResult() {
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.cf;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@ -30,6 +31,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
@ -90,6 +92,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private final TimeseriesService timeseriesService;
private final RocksDBService rocksDBService;
private final TbClusterService clusterService;
private final TbelInvokeService tbelInvokeService;
private ListeningExecutorService calculatedFieldExecutor;
private ListeningExecutorService calculatedFieldCallbackExecutor;
@ -201,7 +204,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
@Override
public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, String> updatedTelemetry) {
public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry) {
try {
CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id));
updateOrInitializeState(calculatedField, calculatedField.getEntityId(), updatedTelemetry);
@ -266,13 +269,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) {
Map<String, Argument> arguments = calculatedField.getConfiguration().getArguments();
Map<String, String> argumentValues = new HashMap<>();
Map<String, KvEntry> argumentValues = new HashMap<>();
AtomicInteger remaining = new AtomicInteger(arguments.size());
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument), new FutureCallback<>() {
@Override
public void onSuccess(Optional<? extends KvEntry> result) {
String value = result.map(KvEntry::getValueAsString).orElse(argument.getDefaultValue());
argumentValues.put(key, value);
// todo: should be rewritten implementation for default value
argumentValues.put(key, result.orElse(null));
if (remaining.decrementAndGet() == 0) {
updateOrInitializeState(calculatedField, entityId, argumentValues);
}
@ -300,7 +303,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
};
}
private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map<String, String> argumentValues) {
private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map<String, KvEntry> argumentValues) {
CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(calculatedField.getUuidId(), entityId.getId());
CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId, ctx -> new CalculatedFieldCtx(ctxId, null));
@ -314,10 +317,21 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
states.put(ctxId, calculatedFieldCtx);
rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx));
CalculatedFieldResult result = state.performCalculation(calculatedField.getConfiguration());
if (result != null) {
pushMsgToRuleEngine(calculatedField.getTenantId(), calculatedField.getEntityId(), result);
}
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedField.getTenantId(), calculatedField.getConfiguration(), tbelInvokeService);
Futures.addCallback(resultFuture, new FutureCallback<>() {
@Override
public void onSuccess(CalculatedFieldResult result) {
if (result != null) {
pushMsgToRuleEngine(calculatedField.getTenantId(), calculatedField.getEntityId(), result);
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedField.getId(), entityId, t);
}
}, MoreExecutors.directExecutor());
}
private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) {
@ -325,8 +339,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
String type = calculatedFieldResult.getType();
TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
ObjectNode jsonNodes = createJsonPayload(calculatedFieldResult);
TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(jsonNodes));
ObjectNode payload = createJsonPayload(calculatedFieldResult);
TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload));
clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null);
} catch (Exception e) {
log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e);
@ -334,9 +348,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) {
ObjectNode jsonNodes = JacksonUtil.newObjectNode();
calculatedFieldResult.getResultMap().forEach(jsonNodes::put);
return jsonNodes;
ObjectNode payload = JacksonUtil.newObjectNode();
Map<String, Object> resultMap = calculatedFieldResult.getResultMap();
resultMap.forEach((k, v) -> payload.set(k, JacksonUtil.convertValue(v, JsonNode.class)));
return payload;
}
private CalculatedFieldState createStateByType(CalculatedFieldType calculatedFieldType) {

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.kv.KvEntry;
import java.util.Map;
public interface CalculatedFieldScriptEngine {
ListenableFuture<Object> executeScriptAsync(Map<String, KvEntry> arguments);
void destroy();
}

View File

@ -18,9 +18,13 @@ package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.Map;
@ -39,12 +43,12 @@ public interface CalculatedFieldState {
@JsonIgnore
CalculatedFieldType getType();
default boolean isValid(Map<String, String> argumentValues, Map<String, Argument> arguments) {
default boolean isValid(Map<String, KvEntry> argumentValues, Map<String, Argument> arguments) {
return argumentValues.keySet().containsAll(arguments.keySet());
}
void initState(Map<String, String> argumentValues);
void initState(Map<String, KvEntry> argumentValues);
CalculatedFieldResult performCalculation(CalculatedFieldConfiguration calculatedFieldConfiguration);
ListenableFuture<CalculatedFieldResult> performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService);
}

View File

@ -0,0 +1,90 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.script.api.ScriptType;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import javax.script.ScriptException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@Slf4j
public class CalculatedFieldTbelScriptEngine implements CalculatedFieldScriptEngine {
private final TbelInvokeService tbelInvokeService;
private final UUID scriptId;
private final TenantId tenantId;
public CalculatedFieldTbelScriptEngine(TenantId tenantId, TbelInvokeService tbelInvokeService, String script, String... argNames) {
this.tenantId = tenantId;
this.tbelInvokeService = tbelInvokeService;
try {
this.scriptId = this.tbelInvokeService.eval(tenantId, ScriptType.CALCULATED_FIELD_SCRIPT, script, argNames).get();
} catch (Exception e) {
Throwable t = e;
if (e instanceof ExecutionException) {
t = e.getCause();
}
throw new IllegalArgumentException("Can't compile script: " + t.getMessage(), t);
}
}
@Override
public ListenableFuture<Object> executeScriptAsync(Map<String, KvEntry> arguments) {
log.trace("execute script async, arguments {}", arguments);
Object[] args = new Object[arguments.size()];
int index = 0;
for (KvEntry entry : arguments.values()) {
switch (entry.getDataType()) {
case BOOLEAN -> args[index] = entry.getBooleanValue().orElse(null);
case DOUBLE -> args[index] = entry.getDoubleValue().orElse(null);
case LONG -> args[index] = entry.getLongValue().orElse(null);
case JSON -> args[index] = entry.getJsonValue().map(JacksonUtil::toJsonNode).orElse(null);
default -> args[index] = entry.getValueAsString();
}
index++;
}
return Futures.transformAsync(tbelInvokeService.invokeScript(tenantId, null, this.scriptId, args),
o -> {
try {
return Futures.immediateFuture(o);
} catch (Exception e) {
if (e.getCause() instanceof ScriptException) {
return Futures.immediateFailedFuture(e.getCause());
} else if (e.getCause() instanceof RuntimeException) {
return Futures.immediateFailedFuture(new ScriptException(e.getCause().getMessage()));
} else {
return Futures.immediateFailedFuture(new ScriptException(e));
}
}
}, MoreExecutors.directExecutor());
}
@Override
public void destroy() {
tbelInvokeService.release(this.scriptId);
}
}

View File

@ -15,10 +15,19 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap;
@ -28,7 +37,10 @@ import java.util.Map;
@Slf4j
public class ScriptCalculatedFieldState implements CalculatedFieldState {
private Map<String, String> arguments = new HashMap<>();
@JsonIgnore
private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
private Map<String, KvEntry> arguments = new HashMap<>();
public ScriptCalculatedFieldState() {
}
@ -39,7 +51,7 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState {
}
@Override
public void initState(Map<String, String> argumentValues) {
public void initState(Map<String, KvEntry> argumentValues) {
if (arguments == null) {
this.arguments = new HashMap<>();
}
@ -47,9 +59,46 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState {
}
@Override
public CalculatedFieldResult performCalculation(CalculatedFieldConfiguration calculatedFieldConfiguration) {
// TODO: implement
return null;
public ListenableFuture<CalculatedFieldResult> performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) {
if (tbelInvokeService == null) {
throw new IllegalArgumentException("TBEL script engine is disabled!");
}
if (calculatedFieldScriptEngine == null) {
initEngine(tenantId, calculatedFieldConfiguration, tbelInvokeService);
}
ListenableFuture<Object> resultFuture = calculatedFieldScriptEngine.executeScriptAsync(arguments);
return Futures.transform(resultFuture, result -> {
Output output = calculatedFieldConfiguration.getOutput();
Map<String, Object> resultMap = new HashMap<>();
if (result instanceof Map<?, ?>) {
Map<String, Object> map = JacksonUtil.convertValue(result, Map.class);
if (map != null) {
resultMap.putAll(map);
}
} else {
resultMap.put(output.getName(), JacksonUtil.convertValue(result, Object.class));
}
CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult();
calculatedFieldResult.setType(output.getType());
calculatedFieldResult.setScope(output.getScope());
calculatedFieldResult.setResultMap(resultMap);
return calculatedFieldResult;
}, MoreExecutors.directExecutor());
}
private void initEngine(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) {
calculatedFieldScriptEngine = new CalculatedFieldTbelScriptEngine(
tenantId,
tbelInvokeService,
calculatedFieldConfiguration.getOutput().getExpression(),
arguments.keySet().toArray(new String[0])
);
}
}

View File

@ -15,13 +15,18 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap;
@ -30,8 +35,7 @@ import java.util.Map;
@Data
public class SimpleCalculatedFieldState implements CalculatedFieldState {
// TODO: use value object(TsKv) instead of string
private Map<String, String> arguments;
private Map<String, KvEntry> arguments;
@Override
public CalculatedFieldType getType() {
@ -39,7 +43,7 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState {
}
@Override
public void initState(Map<String, String> argumentValues) {
public void initState(Map<String, KvEntry> argumentValues) {
if (arguments == null) {
arguments = new HashMap<>();
}
@ -47,7 +51,7 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState {
}
@Override
public CalculatedFieldResult performCalculation(CalculatedFieldConfiguration calculatedFieldConfiguration) {
public ListenableFuture<CalculatedFieldResult> performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) {
Output output = calculatedFieldConfiguration.getOutput();
Map<String, Argument> arguments = calculatedFieldConfiguration.getArguments();
@ -64,19 +68,17 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState {
customExpression.set(expr);
}
Map<String, Double> variables = new HashMap<>();
this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(v)));
this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(v.getValueAsString())));
expr.setVariables(variables);
String expressionResult = String.valueOf(expr.evaluate());
double expressionResult = expr.evaluate();
result.setType(output.getType());
result.setScope(output.getScope());
result.setResultMap(Map.of(output.getName(), expressionResult));
return result;
return Futures.immediateFuture(result);
}
return null;
// TODO: handle what happens when not valid
}
}

View File

@ -69,6 +69,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* Created by ashvayka on 27.03.18.
@ -207,32 +208,28 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();
Map<String, String> attributes = link.getConfiguration().getAttributes();
Map<String, String> timeSeries = link.getConfiguration().getTimeSeries();
List<? extends KvEntry> filteredTelemetry = telemetry.stream()
Map<String, KvEntry> updatedTelemetry = telemetry.stream()
.filter(entry -> attributes.containsValue(entry.getKey()) || timeSeries.containsValue(entry.getKey()))
.toList();
Map<String, String> updatedTelemetry = new HashMap<>();
for (KvEntry telemetryEntry : filteredTelemetry) {
String key = telemetryEntry.getKey();
if (telemetryEntry instanceof AttributeKvEntry) {
for (Map.Entry<String, String> attribute : attributes.entrySet()) {
if (telemetryEntry.getKey().equals(attribute.getValue())) {
key = attribute.getKey();
break;
}
}
}
if (telemetryEntry instanceof TsKvEntry) {
for (Map.Entry<String, String> timeSeriesEntry : timeSeries.entrySet()) {
if (telemetryEntry.getKey().equals(timeSeriesEntry.getValue())) {
key = timeSeriesEntry.getKey();
break;
}
}
}
updatedTelemetry.put(key, telemetryEntry.getValueAsString());
}
.collect(Collectors.toMap(
entry -> {
if (entry instanceof AttributeKvEntry) {
return attributes.entrySet().stream()
.filter(attr -> attr.getValue().equals(entry.getKey()))
.map(Map.Entry::getKey)
.findFirst()
.orElse(entry.getKey());
} else if (entry instanceof TsKvEntry) {
return timeSeries.entrySet().stream()
.filter(ts -> ts.getValue().equals(entry.getKey()))
.map(Map.Entry::getKey)
.findFirst()
.orElse(entry.getKey());
}
return entry.getKey();
},
entry -> entry,
(v1, v2) -> v1
));
if (!updatedTelemetry.isEmpty()) {
calculatedFieldExecutionService.onTelemetryUpdate(tenantId, calculatedFieldId, updatedTelemetry);

View File

@ -16,5 +16,5 @@
package org.thingsboard.script.api;
public enum ScriptType {
RULE_NODE_SCRIPT
RULE_NODE_SCRIPT, CALCULATED_FIELD_SCRIPT
}