diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 9a8b26d074..d4e0d1da84 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos; @@ -26,8 +27,6 @@ public interface CalculatedFieldExecutionService { void onCalculatedFieldMsg(TransportProtos.CalculatedFieldMsgProto proto, TbCallback callback); - void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry); - -// void onEntityProfileUpdate(TransportProtos.CalculatedFieldEntityProfileUpdateMsgProto proto, TbCallback callback); + void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java index 4982445735..87f1d08a84 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java @@ -25,7 +25,7 @@ public class CalculatedFieldResult { private String type; private AttributeScope scope; - private Map resultMap; + private Map resultMap; public CalculatedFieldResult() { } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 250496830c..c83bd7a71a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -30,6 +31,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; @@ -90,6 +92,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private final TimeseriesService timeseriesService; private final RocksDBService rocksDBService; private final TbClusterService clusterService; + private final TbelInvokeService tbelInvokeService; private ListeningExecutorService calculatedFieldExecutor; private ListeningExecutorService calculatedFieldCallbackExecutor; @@ -201,7 +204,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry) { + public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry) { try { CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id)); updateOrInitializeState(calculatedField, calculatedField.getEntityId(), updatedTelemetry); @@ -266,13 +269,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) { Map arguments = calculatedField.getConfiguration().getArguments(); - Map argumentValues = new HashMap<>(); + Map argumentValues = new HashMap<>(); AtomicInteger remaining = new AtomicInteger(arguments.size()); arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(tenantId, argument), new FutureCallback<>() { @Override public void onSuccess(Optional result) { - String value = result.map(KvEntry::getValueAsString).orElse(argument.getDefaultValue()); - argumentValues.put(key, value); + // todo: should be rewritten implementation for default value + argumentValues.put(key, result.orElse(null)); if (remaining.decrementAndGet() == 0) { updateOrInitializeState(calculatedField, entityId, argumentValues); } @@ -300,7 +303,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }; } - private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map argumentValues) { + private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map argumentValues) { CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(calculatedField.getUuidId(), entityId.getId()); CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId, ctx -> new CalculatedFieldCtx(ctxId, null)); @@ -314,10 +317,21 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas states.put(ctxId, calculatedFieldCtx); rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx)); - CalculatedFieldResult result = state.performCalculation(calculatedField.getConfiguration()); - if (result != null) { - pushMsgToRuleEngine(calculatedField.getTenantId(), calculatedField.getEntityId(), result); - } + ListenableFuture resultFuture = state.performCalculation(calculatedField.getTenantId(), calculatedField.getConfiguration(), tbelInvokeService); + Futures.addCallback(resultFuture, new FutureCallback<>() { + @Override + public void onSuccess(CalculatedFieldResult result) { + if (result != null) { + pushMsgToRuleEngine(calculatedField.getTenantId(), calculatedField.getEntityId(), result); + } + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedField.getId(), entityId, t); + } + }, MoreExecutors.directExecutor()); + } private void pushMsgToRuleEngine(TenantId tenantId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult) { @@ -325,8 +339,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas String type = calculatedFieldResult.getType(); TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; - ObjectNode jsonNodes = createJsonPayload(calculatedFieldResult); - TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(jsonNodes)); + ObjectNode payload = createJsonPayload(calculatedFieldResult); + TbMsg msg = TbMsg.newMsg(msgType, originatorId, md, JacksonUtil.writeValueAsString(payload)); clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); } catch (Exception e) { log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e); @@ -334,9 +348,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { - ObjectNode jsonNodes = JacksonUtil.newObjectNode(); - calculatedFieldResult.getResultMap().forEach(jsonNodes::put); - return jsonNodes; + ObjectNode payload = JacksonUtil.newObjectNode(); + Map resultMap = calculatedFieldResult.getResultMap(); + resultMap.forEach((k, v) -> payload.set(k, JacksonUtil.convertValue(v, JsonNode.class))); + return payload; } private CalculatedFieldState createStateByType(CalculatedFieldType calculatedFieldType) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java new file mode 100644 index 0000000000..6b54536019 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.kv.KvEntry; + +import java.util.Map; + +public interface CalculatedFieldScriptEngine { + + ListenableFuture executeScriptAsync(Map arguments); + + void destroy(); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index c25a6960ac..dffcf09820 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -18,9 +18,13 @@ package org.thingsboard.server.service.cf.ctx.state; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.Map; @@ -39,12 +43,12 @@ public interface CalculatedFieldState { @JsonIgnore CalculatedFieldType getType(); - default boolean isValid(Map argumentValues, Map arguments) { + default boolean isValid(Map argumentValues, Map arguments) { return argumentValues.keySet().containsAll(arguments.keySet()); } - void initState(Map argumentValues); + void initState(Map argumentValues); - CalculatedFieldResult performCalculation(CalculatedFieldConfiguration calculatedFieldConfiguration); + ListenableFuture performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java new file mode 100644 index 0000000000..7e8376be8e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java @@ -0,0 +1,90 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.script.api.ScriptType; +import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.KvEntry; + +import javax.script.ScriptException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +@Slf4j +public class CalculatedFieldTbelScriptEngine implements CalculatedFieldScriptEngine { + + private final TbelInvokeService tbelInvokeService; + + private final UUID scriptId; + private final TenantId tenantId; + + public CalculatedFieldTbelScriptEngine(TenantId tenantId, TbelInvokeService tbelInvokeService, String script, String... argNames) { + this.tenantId = tenantId; + this.tbelInvokeService = tbelInvokeService; + try { + this.scriptId = this.tbelInvokeService.eval(tenantId, ScriptType.CALCULATED_FIELD_SCRIPT, script, argNames).get(); + } catch (Exception e) { + Throwable t = e; + if (e instanceof ExecutionException) { + t = e.getCause(); + } + throw new IllegalArgumentException("Can't compile script: " + t.getMessage(), t); + } + } + + @Override + public ListenableFuture executeScriptAsync(Map arguments) { + log.trace("execute script async, arguments {}", arguments); + Object[] args = new Object[arguments.size()]; + int index = 0; + for (KvEntry entry : arguments.values()) { + switch (entry.getDataType()) { + case BOOLEAN -> args[index] = entry.getBooleanValue().orElse(null); + case DOUBLE -> args[index] = entry.getDoubleValue().orElse(null); + case LONG -> args[index] = entry.getLongValue().orElse(null); + case JSON -> args[index] = entry.getJsonValue().map(JacksonUtil::toJsonNode).orElse(null); + default -> args[index] = entry.getValueAsString(); + } + index++; + } + return Futures.transformAsync(tbelInvokeService.invokeScript(tenantId, null, this.scriptId, args), + o -> { + try { + return Futures.immediateFuture(o); + } catch (Exception e) { + if (e.getCause() instanceof ScriptException) { + return Futures.immediateFailedFuture(e.getCause()); + } else if (e.getCause() instanceof RuntimeException) { + return Futures.immediateFailedFuture(new ScriptException(e.getCause().getMessage())); + } else { + return Futures.immediateFailedFuture(new ScriptException(e)); + } + } + }, MoreExecutors.directExecutor()); + } + + @Override + public void destroy() { + tbelInvokeService.release(this.scriptId); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java index 238e8005f2..363c237df5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java @@ -15,10 +15,19 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.HashMap; @@ -28,7 +37,10 @@ import java.util.Map; @Slf4j public class ScriptCalculatedFieldState implements CalculatedFieldState { - private Map arguments = new HashMap<>(); + @JsonIgnore + private CalculatedFieldScriptEngine calculatedFieldScriptEngine; + + private Map arguments = new HashMap<>(); public ScriptCalculatedFieldState() { } @@ -39,7 +51,7 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState { } @Override - public void initState(Map argumentValues) { + public void initState(Map argumentValues) { if (arguments == null) { this.arguments = new HashMap<>(); } @@ -47,9 +59,46 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState { } @Override - public CalculatedFieldResult performCalculation(CalculatedFieldConfiguration calculatedFieldConfiguration) { - // TODO: implement - return null; + public ListenableFuture performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) { + if (tbelInvokeService == null) { + throw new IllegalArgumentException("TBEL script engine is disabled!"); + } + + if (calculatedFieldScriptEngine == null) { + initEngine(tenantId, calculatedFieldConfiguration, tbelInvokeService); + } + + ListenableFuture resultFuture = calculatedFieldScriptEngine.executeScriptAsync(arguments); + + return Futures.transform(resultFuture, result -> { + Output output = calculatedFieldConfiguration.getOutput(); + Map resultMap = new HashMap<>(); + + if (result instanceof Map) { + Map map = JacksonUtil.convertValue(result, Map.class); + if (map != null) { + resultMap.putAll(map); + } + } else { + resultMap.put(output.getName(), JacksonUtil.convertValue(result, Object.class)); + } + + CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult(); + calculatedFieldResult.setType(output.getType()); + calculatedFieldResult.setScope(output.getScope()); + calculatedFieldResult.setResultMap(resultMap); + + return calculatedFieldResult; + }, MoreExecutors.directExecutor()); + } + + private void initEngine(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) { + calculatedFieldScriptEngine = new CalculatedFieldTbelScriptEngine( + tenantId, + tbelInvokeService, + calculatedFieldConfiguration.getOutput().getExpression(), + arguments.keySet().toArray(new String[0]) + ); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index e984e300c6..725da6c7a7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -15,13 +15,18 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; import net.objecthunter.exp4j.Expression; import net.objecthunter.exp4j.ExpressionBuilder; +import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.service.cf.CalculatedFieldResult; import java.util.HashMap; @@ -30,8 +35,7 @@ import java.util.Map; @Data public class SimpleCalculatedFieldState implements CalculatedFieldState { - // TODO: use value object(TsKv) instead of string - private Map arguments; + private Map arguments; @Override public CalculatedFieldType getType() { @@ -39,7 +43,7 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState { } @Override - public void initState(Map argumentValues) { + public void initState(Map argumentValues) { if (arguments == null) { arguments = new HashMap<>(); } @@ -47,7 +51,7 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState { } @Override - public CalculatedFieldResult performCalculation(CalculatedFieldConfiguration calculatedFieldConfiguration) { + public ListenableFuture performCalculation(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) { Output output = calculatedFieldConfiguration.getOutput(); Map arguments = calculatedFieldConfiguration.getArguments(); @@ -64,19 +68,17 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState { customExpression.set(expr); } Map variables = new HashMap<>(); - this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(v))); + this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(v.getValueAsString()))); expr.setVariables(variables); - String expressionResult = String.valueOf(expr.evaluate()); + double expressionResult = expr.evaluate(); result.setType(output.getType()); result.setScope(output.getScope()); result.setResultMap(Map.of(output.getName(), expressionResult)); - return result; + return Futures.immediateFuture(result); } - return null; - // TODO: handle what happens when not valid } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index ac769bee9e..76c1383f6a 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -69,6 +69,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * Created by ashvayka on 27.03.18. @@ -207,32 +208,28 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); Map attributes = link.getConfiguration().getAttributes(); Map timeSeries = link.getConfiguration().getTimeSeries(); - List filteredTelemetry = telemetry.stream() + Map updatedTelemetry = telemetry.stream() .filter(entry -> attributes.containsValue(entry.getKey()) || timeSeries.containsValue(entry.getKey())) - .toList(); - - - Map updatedTelemetry = new HashMap<>(); - for (KvEntry telemetryEntry : filteredTelemetry) { - String key = telemetryEntry.getKey(); - if (telemetryEntry instanceof AttributeKvEntry) { - for (Map.Entry attribute : attributes.entrySet()) { - if (telemetryEntry.getKey().equals(attribute.getValue())) { - key = attribute.getKey(); - break; - } - } - } - if (telemetryEntry instanceof TsKvEntry) { - for (Map.Entry timeSeriesEntry : timeSeries.entrySet()) { - if (telemetryEntry.getKey().equals(timeSeriesEntry.getValue())) { - key = timeSeriesEntry.getKey(); - break; - } - } - } - updatedTelemetry.put(key, telemetryEntry.getValueAsString()); - } + .collect(Collectors.toMap( + entry -> { + if (entry instanceof AttributeKvEntry) { + return attributes.entrySet().stream() + .filter(attr -> attr.getValue().equals(entry.getKey())) + .map(Map.Entry::getKey) + .findFirst() + .orElse(entry.getKey()); + } else if (entry instanceof TsKvEntry) { + return timeSeries.entrySet().stream() + .filter(ts -> ts.getValue().equals(entry.getKey())) + .map(Map.Entry::getKey) + .findFirst() + .orElse(entry.getKey()); + } + return entry.getKey(); + }, + entry -> entry, + (v1, v2) -> v1 + )); if (!updatedTelemetry.isEmpty()) { calculatedFieldExecutionService.onTelemetryUpdate(tenantId, calculatedFieldId, updatedTelemetry); diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/ScriptType.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/ScriptType.java index cdcdf815d0..7f8c513957 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/ScriptType.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/ScriptType.java @@ -16,5 +16,5 @@ package org.thingsboard.script.api; public enum ScriptType { - RULE_NODE_SCRIPT + RULE_NODE_SCRIPT, CALCULATED_FIELD_SCRIPT }