Fixed AlarmNode, UI: Config forms for Alarm, Log and ToEmail nodes.

This commit is contained in:
Igor Kulikov 2018-04-13 19:32:02 +03:00
parent 037e65b380
commit 8bc49896ed
16 changed files with 191 additions and 83 deletions

View File

@ -59,6 +59,7 @@ import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.script.JsExecutorService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
@ -173,6 +174,10 @@ public class ActorSystemContext {
@Getter
private MailExecutorService mailExecutor;
@Autowired
@Getter
private DbCallbackExecutorService dbCallbackExecutor;
@Autowired
@Getter
private MailService mailService;

View File

@ -137,6 +137,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getMailExecutor();
}
@Override
public ListeningExecutor getDbCallbackExecutor() {
return mainCtx.getDbCallbackExecutor();
}
@Override
public ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames) {
return new NashornJsEngine(script, functionName, argNames);

View File

@ -253,6 +253,13 @@ public class RuleChainController extends BaseController {
Set<String> states = engine.executeSwitch(inMsg);
output = objectMapper.writeValueAsString(states);
break;
case "json":
JsonNode json = engine.executeJson(inMsg);
output = objectMapper.writeValueAsString(json);
break;
case "string":
output = engine.executeToString(inMsg);
break;
default:
throw new IllegalArgumentException("Unsupported script type: " + scriptType);
}

View File

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.executors;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* Created by igor on 4/13/18.
*/
public abstract class AbstractListeningExecutor implements ListeningExecutor {
private ListeningExecutorService service;
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(getThreadPollSize()));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
}
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
return service.submit(task);
}
@Override
public void execute(Runnable command) {
service.execute(command);
}
protected abstract int getThreadPollSize();
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.executors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class DbCallbackExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.db_callback_thread_pool_size}")
private int dbCallbackExecutorThreadPoolSize;
@Override
protected int getThreadPollSize() {
return dbCallbackExecutorThreadPoolSize;
}
}

View File

@ -15,40 +15,19 @@
*/
package org.thingsboard.server.service.mail;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.thingsboard.server.service.executors.AbstractListeningExecutor;
@Component
public class MailExecutorService implements ListeningExecutor {
public class MailExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.mail_thread_pool_size}")
private int mailExecutorThreadPoolSize;
private ListeningExecutorService service;
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(mailExecutorThreadPoolSize));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
}
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
return service.submit(task);
protected int getThreadPollSize() {
return mailExecutorThreadPoolSize;
}
}

View File

@ -15,41 +15,19 @@
*/
package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.ListeningExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.thingsboard.server.service.executors.AbstractListeningExecutor;
@Component
public class JsExecutorService implements ListeningExecutor{
public class JsExecutorService extends AbstractListeningExecutor {
@Value("${actors.rule.js_thread_pool_size}")
private int jsExecutorThreadPoolSize;
private ListeningExecutorService service;
@PostConstruct
public void init() {
this.service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(jsExecutorThreadPoolSize));
}
@PreDestroy
public void destroy() {
if (this.service != null) {
this.service.shutdown();
}
}
@Override
public <T> ListenableFuture<T> executeAsync(Callable<T> task) {
return service.submit(task);
protected int getThreadPollSize() {
return jsExecutorThreadPoolSize;
}
}

View File

@ -153,6 +153,10 @@ public class NashornJsEngine implements org.thingsboard.rule.engine.api.ScriptEn
@Override
public String executeToString(TbMsg msg) throws ScriptException {
JsonNode result = executeScript(msg);
if (!result.isTextual()) {
log.warn("Wrong result type: {}", result.getNodeType());
throw new ScriptException("Wrong result type: " + result.getNodeType());
}
return result.asText();
}

View File

@ -219,8 +219,11 @@ actors:
termination.delay: "${ACTORS_RULE_TERMINATION_DELAY:30000}"
# Errors for particular actor are persisted once per specified amount of milliseconds
error_persist_frequency: "${ACTORS_RULE_ERROR_FREQUENCY:3000}"
# Specify thread pool size for database request callbacks executor service
db_callback_thread_pool_size: "${ACTORS_RULE_DB_CALLBACK_THREAD_POOL_SIZE:1}"
# Specify thread pool size for javascript executor service
js_thread_pool_size: "${ACTORS_RULE_JS_THREAD_POOL_SIZE:10}"
# Specify thread pool size for mail sender executor service
mail_thread_pool_size: "${ACTORS_RULE_MAIL_THREAD_POOL_SIZE:10}"
chain:
# Errors for particular actor are persisted once per specified amount of milliseconds

View File

@ -18,8 +18,9 @@ package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
public interface ListeningExecutor {
public interface ListeningExecutor extends Executor {
<T> ListenableFuture<T> executeAsync(Callable<T> task);

View File

@ -88,6 +88,8 @@ public interface TbContext {
ListeningExecutor getMailExecutor();
ListeningExecutor getDbCallbackExecutor();
MailService getMailService();
ScriptEngine createJsScriptEngine(String script, String functionName, String... argNames);

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -32,6 +32,8 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.util.concurrent.ExecutorService;
import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
@Slf4j
@ -47,7 +49,8 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
"If alarm was not created, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'matadata' will contains one of those properties 'isNewAlarm/isExistingAlarm/isClearedAlarm' " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"})
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeAlarmConfig")
public class TbAlarmNode implements TbNode {
@ -81,7 +84,7 @@ public class TbAlarmNode implements TbNode {
} else {
return checkForClearIfExist(ctx, msg);
}
});
}, ctx.getDbCallbackExecutor());
withCallback(transform,
alarmResult -> {
@ -107,7 +110,7 @@ public class TbAlarmNode implements TbNode {
} else {
return updateAlarm(ctx, msg, a);
}
});
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<AlarmResult> checkForClearIfExist(TbContext ctx, TbMsg msg) {
@ -117,12 +120,14 @@ public class TbAlarmNode implements TbNode {
return clearAlarm(ctx, msg, a);
}
return Futures.immediateFuture(new AlarmResult(false, false, false, null));
});
}, ctx.getDbCallbackExecutor());
}
private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg), (Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm, (Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm));
ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg),
(Function<JsonNode, Alarm>) details -> buildAlarm(msg, details, ctx.getTenantId()));
ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
(Function<Alarm, Alarm>) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
return Futures.transform(asyncCreated, (Function<Alarm, AlarmResult>) alarm -> new AlarmResult(true, false, false, alarm));
}
@ -133,7 +138,7 @@ public class TbAlarmNode implements TbNode {
alarm.setDetails(details);
alarm.setEndTs(System.currentTimeMillis());
return ctx.getAlarmService().createOrUpdateAlarm(alarm);
});
}, ctx.getDbCallbackExecutor());
return Futures.transform(asyncUpdated, (Function<Alarm, AlarmResult>) a -> new AlarmResult(false, true, false, a));
}

View File

@ -31,7 +31,9 @@ import static org.thingsboard.rule.engine.DonAsynchron.withCallback;
nodeDescription = "Log incoming messages using JS script for transformation Message into String",
nodeDetails = "Transform incoming Message with configured JS condition to String and log final value. " +
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>" +
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>")
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbActionNodeLogConfig")
public class TbLogNode implements TbNode {

View File

@ -40,7 +40,9 @@ import static org.thingsboard.rule.engine.mail.TbSendEmailNode.SEND_EMAIL_TYPE;
configClazz = TbMsgToEmailNodeConfiguration.class,
nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity",
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ")
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbTransformationNodeToEmailConfig")
public class TbMsgToEmailNode implements TbNode {
private static final ObjectMapper MAPPER = new ObjectMapper();

View File

@ -62,6 +62,7 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
vm.isEditingRuleNodeLink = false;
vm.isLibraryOpen = true;
vm.enableHotKeys = true;
Object.defineProperty(vm, 'isLibraryOpenReadonly', {
get: function() { return vm.isLibraryOpen },
@ -327,66 +328,80 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
description: $translate.instant('rulenode.select-all-objects'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
vm.modelservice.selectAll();
}
}
})
.add({
combo: 'ctrl+c',
description: $translate.instant('rulenode.copy-selected'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
copyRuleNodes();
}
}
})
.add({
combo: 'ctrl+v',
description: $translate.instant('action.paste'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
if (itembuffer.hasRuleNodes()) {
pasteRuleNodes();
}
}
}
})
.add({
combo: 'esc',
description: $translate.instant('rulenode.deselect-all-objects'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
event.stopPropagation();
vm.modelservice.deselectAll();
}
}
})
.add({
combo: 'ctrl+s',
description: $translate.instant('action.apply'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
vm.saveRuleChain();
}
}
})
.add({
combo: 'ctrl+z',
description: $translate.instant('action.decline-changes'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
vm.revertRuleChain();
}
}
})
.add({
combo: 'del',
description: $translate.instant('rulenode.delete-selected-objects'),
allowIn: ['INPUT', 'SELECT', 'TEXTAREA'],
callback: function (event) {
if (vm.enableHotKeys) {
event.preventDefault();
vm.modelservice.deleteSelected();
}
}
})
}
@ -574,6 +589,7 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
$scope.$watch(function() {
return vm.isEditingRuleNode || vm.isEditingRuleNodeLink;
}, (val) => {
vm.enableHotKeys = !val;
updateErrorTooltips(val);
});
@ -605,12 +621,15 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
}
} else {
var labels = ruleChainService.getRuleNodeSupportedLinks(sourceNode.component);
vm.enableHotKeys = false;
addRuleNodeLink(event, edge, labels).then(
(link) => {
deferred.resolve(link);
vm.enableHotKeys = true;
},
() => {
deferred.reject();
vm.enableHotKeys = true;
}
);
}
@ -1159,6 +1178,8 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
var ruleChainId = vm.ruleChain.id ? vm.ruleChain.id.id : null;
vm.enableHotKeys = false;
$mdDialog.show({
controller: 'AddRuleNodeController',
controllerAs: 'vm',
@ -1188,7 +1209,9 @@ export function RuleChainController($state, $scope, $compile, $q, $mdUtil, $time
}
vm.ruleChainModel.nodes.push(ruleNode);
updateRuleNodesHighlight();
vm.enableHotKeys = true;
}, function () {
vm.enableHotKeys = true;
});
}