Separate Dispatchers for system and tenant rules/plugins
This commit is contained in:
parent
7d4da490fe
commit
af5e52dd97
@ -32,10 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.*;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
|
||||||
import org.thingsboard.server.common.data.id.PluginId;
|
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKey;
|
import org.thingsboard.server.common.data.kv.AttributeKey;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
@ -74,6 +71,10 @@ public final class PluginProcessingContext implements PluginContext {
|
|||||||
this.securityCtx = Optional.ofNullable(securityCtx);
|
this.securityCtx = Optional.ofNullable(securityCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void persistError(String method, Exception e) {
|
||||||
|
pluginCtx.persistError(method, e);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendPluginRpcMsg(RpcMsg msg) {
|
public void sendPluginRpcMsg(RpcMsg msg) {
|
||||||
this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
|
this.pluginCtx.rpcService.tell(new PluginRpcMsg(pluginCtx.tenantId, pluginCtx.pluginId, msg));
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||||
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
|
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
|
||||||
@ -73,6 +74,10 @@ public final class SharedPluginProcessingContext {
|
|||||||
return pluginId;
|
return pluginId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TenantId getPluginTenantId() {
|
||||||
|
return tenantId;
|
||||||
|
}
|
||||||
|
|
||||||
public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
|
public void toDeviceActor(DeviceAttributesEventNotificationMsg msg) {
|
||||||
forward(msg.getDeviceId(), msg, rpcService::tell);
|
forward(msg.getDeviceId(), msg, rpcService::tell);
|
||||||
}
|
}
|
||||||
@ -105,6 +110,10 @@ public final class SharedPluginProcessingContext {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void persistError(String method, Exception e) {
|
||||||
|
systemContext.persistError(tenantId, pluginId, method, e);
|
||||||
|
}
|
||||||
|
|
||||||
public ActorRef self() {
|
public ActorRef self() {
|
||||||
return currentActor;
|
return currentActor;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -69,8 +69,10 @@ public class DefaultActorService implements ActorService {
|
|||||||
|
|
||||||
public static final String APP_DISPATCHER_NAME = "app-dispatcher";
|
public static final String APP_DISPATCHER_NAME = "app-dispatcher";
|
||||||
public static final String CORE_DISPATCHER_NAME = "core-dispatcher";
|
public static final String CORE_DISPATCHER_NAME = "core-dispatcher";
|
||||||
public static final String RULE_DISPATCHER_NAME = "rule-dispatcher";
|
public static final String SYSTEM_RULE_DISPATCHER_NAME = "system-rule-dispatcher";
|
||||||
public static final String PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
|
public static final String SYSTEM_PLUGIN_DISPATCHER_NAME = "system-plugin-dispatcher";
|
||||||
|
public static final String TENANT_RULE_DISPATCHER_NAME = "rule-dispatcher";
|
||||||
|
public static final String TENANT_PLUGIN_DISPATCHER_NAME = "plugin-dispatcher";
|
||||||
public static final String SESSION_DISPATCHER_NAME = "session-dispatcher";
|
public static final String SESSION_DISPATCHER_NAME = "session-dispatcher";
|
||||||
public static final String RPC_DISPATCHER_NAME = "rpc-dispatcher";
|
public static final String RPC_DISPATCHER_NAME = "rpc-dispatcher";
|
||||||
|
|
||||||
|
|||||||
@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.actors.plugin.PluginActor;
|
import org.thingsboard.server.actors.plugin.PluginActor;
|
||||||
import org.thingsboard.server.actors.service.ContextAwareActor;
|
import org.thingsboard.server.actors.service.ContextAwareActor;
|
||||||
import org.thingsboard.server.actors.service.DefaultActorService;
|
|
||||||
import org.thingsboard.server.common.data.id.PluginId;
|
import org.thingsboard.server.common.data.id.PluginId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||||
@ -60,10 +59,12 @@ public abstract class PluginManager {
|
|||||||
|
|
||||||
abstract TenantId getTenantId();
|
abstract TenantId getTenantId();
|
||||||
|
|
||||||
|
abstract String getDispatcherName();
|
||||||
|
|
||||||
public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
|
public ActorRef getOrCreatePluginActor(ActorContext context, PluginId pluginId) {
|
||||||
return pluginActors.computeIfAbsent(pluginId, pId ->
|
return pluginActors.computeIfAbsent(pluginId, pId ->
|
||||||
context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
|
context.actorOf(Props.create(new PluginActor.ActorCreator(systemContext, getTenantId(), pId))
|
||||||
.withDispatcher(DefaultActorService.PLUGIN_DISPATCHER_NAME), pId.toString()));
|
.withDispatcher(getDispatcherName()), pId.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void broadcast(Object msg) {
|
public void broadcast(Object msg) {
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.actors.shared.plugin;
|
package org.thingsboard.server.actors.shared.plugin;
|
||||||
|
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
|
import org.thingsboard.server.actors.service.DefaultActorService;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
||||||
import org.thingsboard.server.common.data.plugin.PluginMetaData;
|
import org.thingsboard.server.common.data.plugin.PluginMetaData;
|
||||||
@ -37,4 +38,8 @@ public class SystemPluginManager extends PluginManager {
|
|||||||
return BasePluginService.SYSTEM_TENANT;
|
return BasePluginService.SYSTEM_TENANT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getDispatcherName() {
|
||||||
|
return DefaultActorService.SYSTEM_PLUGIN_DISPATCHER_NAME;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.actors.shared.plugin;
|
package org.thingsboard.server.actors.shared.plugin;
|
||||||
|
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
|
import org.thingsboard.server.actors.service.DefaultActorService;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
||||||
import org.thingsboard.server.common.data.plugin.PluginMetaData;
|
import org.thingsboard.server.common.data.plugin.PluginMetaData;
|
||||||
@ -38,4 +39,10 @@ public class TenantPluginManager extends PluginManager {
|
|||||||
TenantId getTenantId() {
|
TenantId getTenantId() {
|
||||||
return tenantId;
|
return tenantId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getDispatcherName() {
|
||||||
|
return DefaultActorService.TENANT_PLUGIN_DISPATCHER_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -100,10 +100,12 @@ public abstract class RuleManager {
|
|||||||
|
|
||||||
abstract FetchFunction<RuleMetaData> getFetchRulesFunction();
|
abstract FetchFunction<RuleMetaData> getFetchRulesFunction();
|
||||||
|
|
||||||
|
abstract String getDispatcherName();
|
||||||
|
|
||||||
public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) {
|
public ActorRef getOrCreateRuleActor(ActorContext context, RuleId ruleId) {
|
||||||
return ruleActors.computeIfAbsent(ruleId, rId ->
|
return ruleActors.computeIfAbsent(ruleId, rId ->
|
||||||
context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId))
|
context.actorOf(Props.create(new RuleActor.ActorCreator(systemContext, tenantId, rId))
|
||||||
.withDispatcher(DefaultActorService.RULE_DISPATCHER_NAME), rId.toString()));
|
.withDispatcher(getDispatcherName()), rId.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public RuleActorChain getRuleChain() {
|
public RuleActorChain getRuleChain() {
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.actors.shared.rule;
|
package org.thingsboard.server.actors.shared.rule;
|
||||||
|
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
|
import org.thingsboard.server.actors.service.DefaultActorService;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
||||||
import org.thingsboard.server.common.data.rule.RuleMetaData;
|
import org.thingsboard.server.common.data.rule.RuleMetaData;
|
||||||
@ -32,4 +33,8 @@ public class SystemRuleManager extends RuleManager {
|
|||||||
return ruleService::findSystemRules;
|
return ruleService::findSystemRules;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getDispatcherName() {
|
||||||
|
return DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
package org.thingsboard.server.actors.shared.rule;
|
package org.thingsboard.server.actors.shared.rule;
|
||||||
|
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
|
import org.thingsboard.server.actors.service.DefaultActorService;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
|
||||||
import org.thingsboard.server.common.data.rule.RuleMetaData;
|
import org.thingsboard.server.common.data.rule.RuleMetaData;
|
||||||
@ -31,4 +32,9 @@ public class TenantRuleManager extends RuleManager {
|
|||||||
return link -> ruleService.findTenantRules(tenantId, link);
|
return link -> ruleService.findTenantRules(tenantId, link);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getDispatcherName() {
|
||||||
|
return DefaultActorService.TENANT_RULE_DISPATCHER_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -92,7 +92,53 @@ core-dispatcher {
|
|||||||
throughput = 5
|
throughput = 5
|
||||||
}
|
}
|
||||||
|
|
||||||
# This dispatcher is used for rule actors
|
# This dispatcher is used for system rule actors
|
||||||
|
system-rule-dispatcher {
|
||||||
|
type = Dispatcher
|
||||||
|
executor = "fork-join-executor"
|
||||||
|
fork-join-executor {
|
||||||
|
# Min number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-min = 2
|
||||||
|
# Max number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-max = 12
|
||||||
|
|
||||||
|
# The parallelism factor is used to determine thread pool size using the
|
||||||
|
# following formula: ceil(available processors * factor). Resulting size
|
||||||
|
# is then bounded by the parallelism-min and parallelism-max values.
|
||||||
|
parallelism-factor = 0.25
|
||||||
|
}
|
||||||
|
# How long time the dispatcher will wait for new actors until it shuts down
|
||||||
|
shutdown-timeout = 1s
|
||||||
|
|
||||||
|
# Throughput defines the number of messages that are processed in a batch
|
||||||
|
# before the thread is returned to the pool. Set to 1 for as fair as possible.
|
||||||
|
throughput = 5
|
||||||
|
}
|
||||||
|
|
||||||
|
# This dispatcher is used for system plugin actors
|
||||||
|
system-plugin-dispatcher {
|
||||||
|
type = Dispatcher
|
||||||
|
executor = "fork-join-executor"
|
||||||
|
fork-join-executor {
|
||||||
|
# Min number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-min = 2
|
||||||
|
# Max number of threads to cap factor-based parallelism number to
|
||||||
|
parallelism-max = 12
|
||||||
|
|
||||||
|
# The parallelism factor is used to determine thread pool size using the
|
||||||
|
# following formula: ceil(available processors * factor). Resulting size
|
||||||
|
# is then bounded by the parallelism-min and parallelism-max values.
|
||||||
|
parallelism-factor = 0.25
|
||||||
|
}
|
||||||
|
# How long time the dispatcher will wait for new actors until it shuts down
|
||||||
|
shutdown-timeout = 1s
|
||||||
|
|
||||||
|
# Throughput defines the number of messages that are processed in a batch
|
||||||
|
# before the thread is returned to the pool. Set to 1 for as fair as possible.
|
||||||
|
throughput = 5
|
||||||
|
}
|
||||||
|
|
||||||
|
# This dispatcher is used for tenant rule actors
|
||||||
rule-dispatcher {
|
rule-dispatcher {
|
||||||
type = Dispatcher
|
type = Dispatcher
|
||||||
executor = "fork-join-executor"
|
executor = "fork-join-executor"
|
||||||
@ -115,7 +161,7 @@ rule-dispatcher {
|
|||||||
throughput = 5
|
throughput = 5
|
||||||
}
|
}
|
||||||
|
|
||||||
# This dispatcher is used for rule actors
|
# This dispatcher is used for tenant plugin actors
|
||||||
plugin-dispatcher {
|
plugin-dispatcher {
|
||||||
type = Dispatcher
|
type = Dispatcher
|
||||||
executor = "fork-join-executor"
|
executor = "fork-join-executor"
|
||||||
|
|||||||
@ -16,10 +16,7 @@
|
|||||||
package org.thingsboard.server.extensions.api.plugins;
|
package org.thingsboard.server.extensions.api.plugins;
|
||||||
|
|
||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.id.CustomerId;
|
import org.thingsboard.server.common.data.id.*;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
|
||||||
import org.thingsboard.server.common.data.id.PluginId;
|
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvQuery;
|
import org.thingsboard.server.common.data.kv.TsKvQuery;
|
||||||
@ -46,6 +43,8 @@ public interface PluginContext {
|
|||||||
|
|
||||||
Optional<PluginApiCallSecurityContext> getSecurityCtx();
|
Optional<PluginApiCallSecurityContext> getSecurityCtx();
|
||||||
|
|
||||||
|
void persistError(String method, Exception e);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Device RPC API
|
Device RPC API
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -33,6 +33,9 @@ import org.thingsboard.server.extensions.core.action.mail.SendMailActionMsg;
|
|||||||
import javax.mail.MessagingException;
|
import javax.mail.MessagingException;
|
||||||
import javax.mail.internet.MimeMessage;
|
import javax.mail.internet.MimeMessage;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Andrew Shvayka
|
* @author Andrew Shvayka
|
||||||
@ -41,6 +44,9 @@ import java.util.Properties;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implements RuleMsgHandler {
|
public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implements RuleMsgHandler {
|
||||||
|
|
||||||
|
//TODO: Add logic to close this executor on shutdown.
|
||||||
|
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
|
|
||||||
private MailPluginConfiguration configuration;
|
private MailPluginConfiguration configuration;
|
||||||
private JavaMailSenderImpl mailSender;
|
private JavaMailSenderImpl mailSender;
|
||||||
|
|
||||||
@ -84,12 +90,14 @@ public class MailPlugin extends AbstractPlugin<MailPluginConfiguration> implemen
|
|||||||
@Override
|
@Override
|
||||||
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
|
public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
|
||||||
if (msg.getPayload() instanceof SendMailActionMsg) {
|
if (msg.getPayload() instanceof SendMailActionMsg) {
|
||||||
|
executor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
sendMail((SendMailActionMsg) msg.getPayload());
|
sendMail((SendMailActionMsg) msg.getPayload());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("Failed to send email", e);
|
log.warn("[{}] Failed to send email", ctx.getPluginId(), e);
|
||||||
throw new RuleException("Failed to send email", e);
|
ctx.persistError("Failed to send email", e);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
throw new RuntimeException("Not supported msg type: " + msg.getPayload().getClass() + "!");
|
throw new RuntimeException("Not supported msg type: " + msg.getPayload().getClass() + "!");
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user