diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index eecf979a70..fa82d27971 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -33,6 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.RuleChainTransactionService; @@ -89,6 +90,7 @@ import java.io.StringWriter; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Component @@ -286,6 +288,21 @@ public class ActorSystemContext { @Getter private long statisticsPersistFrequency; + @Getter + private final AtomicInteger jsInvokeRequestsCount = new AtomicInteger(0); + @Getter + private final AtomicInteger jsInvokeResponsesCount = new AtomicInteger(0); + @Getter + private final AtomicInteger jsInvokeFailuresCount = new AtomicInteger(0); + + @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") + public void printStats() { + if (statisticsEnabled) { + log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]", + jsInvokeRequestsCount.getAndSet(0), jsInvokeResponsesCount.getAndSet(0), jsInvokeFailuresCount.getAndSet(0)); + } + } + @Value("${actors.tenant.create_components_on_init}") @Getter private boolean tenantComponentsInitEnabled; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ae60262817..e5e326e597 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -229,6 +229,27 @@ class DefaultTbContext implements TbContext { return new RuleNodeJsScriptEngine(mainCtx.getJsSandbox(), nodeCtx.getSelf().getId(), script, argNames); } + @Override + public void logJsEvalRequest() { + if (mainCtx.isStatisticsEnabled()) { + mainCtx.getJsInvokeRequestsCount().incrementAndGet(); + } + } + + @Override + public void logJsEvalResponse() { + if (mainCtx.isStatisticsEnabled()) { + mainCtx.getJsInvokeResponsesCount().incrementAndGet(); + } + } + + @Override + public void logJsEvalFailure() { + if (mainCtx.isStatisticsEnabled()) { + mainCtx.getJsInvokeFailuresCount().incrementAndGet(); + } + } + @Override public String getNodeId() { return mainCtx.getNodeIdProvider().getNodeId(); diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java index 99051389c8..0bd1eb047d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingService.java @@ -131,15 +131,21 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService { private void addNode(ServerInstance instance) { for (int i = 0; i < virtualNodesSize; i++) { circles[instance.getServerAddress().getServerType().ordinal()].put(hash(instance, i).asLong(), instance); +// circles[instance.getServerAddress().getServerType().ordinal()].put(classic(instance, i), instance); } } private void removeNode(ServerInstance instance) { for (int i = 0; i < virtualNodesSize; i++) { circles[instance.getServerAddress().getServerType().ordinal()].remove(hash(instance, i).asLong()); +// circles[instance.getServerAddress().getServerType().ordinal()].remove(classic(instance, i)); } } + private long classic(ServerInstance instance, int i) { + return (instance.getHost() + instance.getPort() + i).hashCode() * (Long.MAX_VALUE / Integer.MAX_VALUE); + } + private HashCode hash(ServerInstance instance, int i) { return hashFunction.newHasher().putString(instance.getHost(), MiscUtils.UTF8).putInt(instance.getPort()).putInt(i).hash(); } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 4f73a59961..5eb339ae02 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.gen.js.JsInvokeProtos; import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; @@ -29,13 +30,14 @@ import org.thingsboard.server.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.kafka.TbKafkaRequestTemplate; import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.kafka.TbNodeIdProvider; -import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; @Slf4j @ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true) @@ -70,6 +72,25 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { @Value("${js.remote.max_errors}") private int maxErrors; + @Value("${js.remote.stats.enabled:false}") + private boolean statsEnabled; + + private final AtomicInteger kafkaPushedMsgs = new AtomicInteger(0); + private final AtomicInteger kafkaInvokeMsgs = new AtomicInteger(0); + private final AtomicInteger kafkaEvalMsgs = new AtomicInteger(0); + private final AtomicInteger kafkaFailedMsgs = new AtomicInteger(0); + + @Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}") + public void printStats() { + if (statsEnabled) { + int invokeMsgs = kafkaInvokeMsgs.getAndSet(0); + int evalMsgs = kafkaEvalMsgs.getAndSet(0); + int failed = kafkaFailedMsgs.getAndSet(0); + log.info("Kafka JS Invoke Stats: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}]", + kafkaPushedMsgs.getAndSet(0), invokeMsgs + evalMsgs, invokeMsgs, evalMsgs, failed); + } + } + private TbKafkaRequestTemplate kafkaTemplate; private Map scriptIdToBodysMap = new ConcurrentHashMap<>(); @@ -139,14 +160,17 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { log.trace("Post compile request for scriptId [{}]", scriptId); ListenableFuture future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); + kafkaPushedMsgs.incrementAndGet(); return Futures.transform(future, response -> { JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); if (compilationResult.getSuccess()) { + kafkaEvalMsgs.incrementAndGet(); scriptIdToNameMap.put(scriptId, functionName); scriptIdToBodysMap.put(scriptId, scriptBody); return compiledScriptId; } else { + kafkaFailedMsgs.incrementAndGet(); log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails()); } @@ -174,12 +198,16 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { .setInvokeRequest(jsRequestBuilder.build()) .build(); + ListenableFuture future = kafkaTemplate.post(scriptId.toString(), jsRequestWrapper); + kafkaPushedMsgs.incrementAndGet(); return Futures.transform(future, response -> { JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse(); if (invokeResult.getSuccess()) { + kafkaInvokeMsgs.incrementAndGet(); return invokeResult.getResult(); } else { + kafkaFailedMsgs.incrementAndGet(); log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); throw new RuntimeException(invokeResult.getErrorDetails()); } diff --git a/application/src/main/java/org/thingsboard/server/utils/MiscUtils.java b/application/src/main/java/org/thingsboard/server/utils/MiscUtils.java index 7a4648f234..67fbf15be5 100644 --- a/application/src/main/java/org/thingsboard/server/utils/MiscUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/MiscUtils.java @@ -19,6 +19,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import java.nio.charset.Charset; +import java.util.Random; /** diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 8fd418a153..92977ba42f 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -246,6 +246,7 @@ actors: statistics: # Enable/disable actor statistics enabled: "${ACTORS_STATISTICS_ENABLED:true}" + js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}" persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" queue: # Enable/disable persistence of un-processed messages to the queue @@ -467,6 +468,9 @@ js: response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}" + stats: + enabled: "${TB_JS_REMOTE_STATS_ENABLED:false}" + print_interval_ms: "${TB_JS_REMOTE_STATS_PRINT_INTERVAL_MS:10000}" transport: type: "${TRANSPORT_TYPE:local}" # local or remote diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingServiceTest.java b/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingServiceTest.java new file mode 100644 index 0000000000..ff45a4737d --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentClusterRoutingServiceTest.java @@ -0,0 +1,118 @@ +/** + * Copyright © 2016-2019 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cluster.routing; + +import com.datastax.driver.core.utils.UUIDs; +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.UUIDConverter; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.common.msg.cluster.ServerType; +import org.thingsboard.server.service.cluster.discovery.DiscoveryService; +import org.thingsboard.server.service.cluster.discovery.ServerInstance; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Slf4j +@RunWith(MockitoJUnitRunner.class) +public class ConsistentClusterRoutingServiceTest { + + private ConsistentClusterRoutingService clusterRoutingService; + + private DiscoveryService discoveryService; + + private String hashFunctionName = "murmur3_128"; + private Integer virtualNodesSize = 1024*64; + private ServerAddress currentServer = new ServerAddress(" 100.96.1.0", 9001, ServerType.CORE); + + + @Before + public void setup() throws Exception { + discoveryService = mock(DiscoveryService.class); + clusterRoutingService = new ConsistentClusterRoutingService(); + ReflectionTestUtils.setField(clusterRoutingService, "discoveryService", discoveryService); + ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); + ReflectionTestUtils.setField(clusterRoutingService, "virtualNodesSize", virtualNodesSize); + when(discoveryService.getCurrentServer()).thenReturn(new ServerInstance(currentServer)); + List otherServers = new ArrayList<>(); + for (int i = 1; i < 30; i++) { + otherServers.add(new ServerInstance(new ServerAddress(" 100.96." + i*2 + "." + i, 9001, ServerType.CORE))); + } + when(discoveryService.getOtherServers()).thenReturn(otherServers); + clusterRoutingService.init(); + } + + @Test + public void testDispersionOnMillionDevices() { + List devices = new ArrayList<>(); + for (int i = 0; i < 1000000; i++) { + devices.add(new DeviceId(UUIDs.timeBased())); + } + + testDevicesDispersion(devices); + } + + @Test + public void testDispersionOnDevicesFromFile() throws IOException { + List deviceIdsStrList = Files.readAllLines(Paths.get("/home/ashvayka/Downloads/deviceIds.out")); + List devices = deviceIdsStrList.stream().map(String::trim).filter(s -> !s.isEmpty()).map(UUIDConverter::fromString).map(DeviceId::new).collect(Collectors.toList()); + System.out.println("Devices: " + devices.size()); + testDevicesDispersion(devices); + testDevicesDispersion(devices); + testDevicesDispersion(devices); + testDevicesDispersion(devices); + testDevicesDispersion(devices); + + } + + private void testDevicesDispersion(List devices) { + long start = System.currentTimeMillis(); + Map map = new HashMap<>(); + for (DeviceId deviceId : devices) { + ServerAddress address = clusterRoutingService.resolveById(deviceId).orElse(currentServer); + map.put(address, map.getOrDefault(address, 0) + 1); + } + + List> data = map.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getValue)).collect(Collectors.toList()); + long end = System.currentTimeMillis(); + System.out.println("Size: " + virtualNodesSize + " Time: " + (end - start) + " Diff: " + (data.get(data.size() - 1).getValue() - data.get(0).getValue())); + + for (Map.Entry entry : data) { +// System.out.println(entry.getKey().getHost() + ": " + entry.getValue()); + } + + } + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index a1902d3131..d91f12da96 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -123,6 +123,12 @@ public interface TbContext { ScriptEngine createJsScriptEngine(String script, String... argNames); + void logJsEvalRequest(); + + void logJsEvalResponse(); + + void logJsEvalFailure(); + String getNodeId(); RuleChainTransactionService getRuleChainTransactionService(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java index e94f236a74..0a7f6113f1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java @@ -65,8 +65,10 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) { + ctx.logJsEvalRequest(); ListenableFuture asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails()); return Futures.transformAsync(asyncDetails, details -> { + ctx.logJsEvalResponse(); ListenableFuture clearFuture = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), details, System.currentTimeMillis()); return Futures.transformAsync(clearFuture, cleared -> { ListenableFuture savedAlarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), alarm.getId()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java index dcf7b6987d..d6f6236311 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java @@ -42,10 +42,10 @@ import java.io.IOException; nodeDescription = "Create or Update Alarm", nodeDetails = "Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.\n" + - "Node output:\n" + - "If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm'. " + - "Message payload can be accessed via msg property. For example 'temperature = ' + msg.temperature ;. " + - "Message metadata can be accessed via metadata property. For example 'name = ' + metadata.customerName;.", + "Node output:\n" + + "If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm'. " + + "Message payload can be accessed via msg property. For example 'temperature = ' + msg.temperature ;. " + + "Message metadata can be accessed via metadata property. For example 'name = ' + metadata.customerName;.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodeCreateAlarmConfig", icon = "notifications_active" @@ -103,11 +103,15 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) { ListenableFuture asyncAlarm; - if (msgAlarm != null ) { + if (msgAlarm != null) { asyncAlarm = Futures.immediateCheckedFuture(msgAlarm); } else { + ctx.logJsEvalRequest(); asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null), - details -> buildAlarm(msg, details, ctx.getTenantId())); + details -> { + ctx.logJsEvalResponse(); + return buildAlarm(msg, details, ctx.getTenantId()); + }); } ListenableFuture asyncCreated = Futures.transform(asyncAlarm, alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor()); @@ -115,7 +119,9 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) { + ctx.logJsEvalRequest(); ListenableFuture asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg, existingAlarm.getDetails()), (Function) details -> { + ctx.logJsEvalResponse(); if (msgAlarm != null) { existingAlarm.setSeverity(msgAlarm.getSeverity()); existingAlarm.setPropagate(msgAlarm.isPropagate()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java index a11a4bf18b..f3fd84788c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbLogNode.java @@ -53,12 +53,17 @@ public class TbLogNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { ListeningExecutor jsExecutor = ctx.getJsExecutor(); + ctx.logJsEvalRequest(); withCallback(jsExecutor.executeAsync(() -> jsEngine.executeToString(msg)), toString -> { + ctx.logJsEvalResponse(); log.info(toString); ctx.tellNext(msg, SUCCESS); }, - t -> ctx.tellFailure(msg, t)); + t -> { + ctx.logJsEvalResponse(); + ctx.tellFailure(msg, t); + }); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 478b0b4666..d5ac552595 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -129,7 +129,9 @@ public class TbMsgGeneratorNode implements TbNode { prevMsg = ctx.newMsg("", originatorId, new TbMsgMetaData(), "{}"); } if (initialized) { + ctx.logJsEvalRequest(); TbMsg generated = jsEngine.executeGenerate(prevMsg); + ctx.logJsEvalResponse(); prevMsg = ctx.newMsg(generated.getType(), originatorId, generated.getMetaData(), generated.getData()); } return prevMsg; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java index c23043bbb0..a9b287fa4c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsFilterNode.java @@ -52,9 +52,16 @@ public class TbJsFilterNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { ListeningExecutor jsExecutor = ctx.getJsExecutor(); + ctx.logJsEvalRequest(); withCallback(jsExecutor.executeAsync(() -> jsEngine.executeFilter(msg)), - filterResult -> ctx.tellNext(msg, filterResult ? "True" : "False"), - t -> ctx.tellFailure(msg, t)); + filterResult -> { + ctx.logJsEvalResponse(); + ctx.tellNext(msg, filterResult ? "True" : "False"); + }, + t -> { + ctx.tellFailure(msg, t); + ctx.logJsEvalFailure(); + }, ctx.getDbCallbackExecutor()); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java index 0122b9fed7..5ba4d2b0dd 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbJsSwitchNode.java @@ -54,9 +54,16 @@ public class TbJsSwitchNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { ListeningExecutor jsExecutor = ctx.getJsExecutor(); + ctx.logJsEvalRequest(); withCallback(jsExecutor.executeAsync(() -> jsEngine.executeSwitch(msg)), - result -> processSwitch(ctx, msg, result), - t -> ctx.tellFailure(msg, t)); + result -> { + ctx.logJsEvalResponse(); + processSwitch(ctx, msg, result); + }, + t -> { + ctx.logJsEvalFailure(); + ctx.tellFailure(msg, t); + }, ctx.getDbCallbackExecutor()); } private void processSwitch(TbContext ctx, TbMsg msg, Set nextRelations) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java index 0e14a31a63..b1f282b742 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbAbstractTransformNode.java @@ -44,14 +44,21 @@ public abstract class TbAbstractTransformNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { withCallback(transform(ctx, msg), - m -> { - if (m != null) { - ctx.tellNext(m, SUCCESS); - } else { - ctx.tellNext(msg, FAILURE); - } - }, - t -> ctx.tellFailure(msg, t)); + m -> transformSuccess(ctx, msg, m), + t -> transformFailure(ctx, msg, t), + ctx.getDbCallbackExecutor()); + } + + protected void transformFailure(TbContext ctx, TbMsg msg, Throwable t) { + ctx.tellFailure(msg, t); + } + + protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) { + if (m != null) { + ctx.tellNext(m, SUCCESS); + } else { + ctx.tellNext(msg, FAILURE); + } } protected abstract ListenableFuture transform(TbContext ctx, TbMsg msg); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java index 81c4483166..a9239ae924 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/transform/TbTransformMsgNode.java @@ -21,6 +21,9 @@ import org.thingsboard.rule.engine.api.*; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; +import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; +import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; + @RuleNode( type = ComponentType.TRANSFORMATION, name = "script", @@ -49,9 +52,22 @@ public class TbTransformMsgNode extends TbAbstractTransformNode { @Override protected ListenableFuture transform(TbContext ctx, TbMsg msg) { + ctx.logJsEvalRequest(); return ctx.getJsExecutor().executeAsync(() -> jsEngine.executeUpdate(msg)); } + @Override + protected void transformSuccess(TbContext ctx, TbMsg msg, TbMsg m) { + ctx.logJsEvalResponse(); + super.transformSuccess(ctx, msg, m); + } + + @Override + protected void transformFailure(TbContext ctx, TbMsg msg, Throwable t) { + ctx.logJsEvalFailure(); + super.transformFailure(ctx, msg, t); + } + @Override public void destroy() { if (jsEngine != null) {