diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 091b504ea7..a59e85cf59 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -61,6 +61,7 @@ import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; +import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.mail.MailExecutorService; @@ -186,6 +187,10 @@ public class ActorSystemContext { @Getter private MailExecutorService mailExecutor; + @Autowired + @Getter + private ClusterRpcCallbackExecutorService clusterRpcCallbackExecutor; + @Autowired @Getter private DbCallbackExecutorService dbCallbackExecutor; diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java index 760d4a6d96..669ed74b86 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java @@ -17,10 +17,12 @@ package org.thingsboard.server.actors.rpc; import akka.actor.ActorRef; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.cluster.rpc.GrpcSession; import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener; +import org.thingsboard.server.service.executors.ClusterRpcCallbackExecutorService; /** * @author Andrew Shvayka @@ -28,12 +30,14 @@ import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener; @Slf4j public class BasicRpcSessionListener implements GrpcSessionListener { + private final ClusterRpcCallbackExecutorService callbackExecutorService; private final ActorService service; private final ActorRef manager; private final ActorRef self; - BasicRpcSessionListener(ActorService service, ActorRef manager, ActorRef self) { - this.service = service; + BasicRpcSessionListener(ActorSystemContext context, ActorRef manager, ActorRef self) { + this.service = context.getActorService(); + this.callbackExecutorService = context.getClusterRpcCallbackExecutor(); this.manager = manager; this.self = self; } @@ -55,7 +59,13 @@ public class BasicRpcSessionListener implements GrpcSessionListener { @Override public void onReceiveClusterGrpcMsg(GrpcSession session, ClusterAPIProtos.ClusterMessage clusterMessage) { log.trace("Received session actor msg from [{}][{}]: {}", session.getRemoteServer(), getType(session), clusterMessage); - service.onReceivedMsg(session.getRemoteServer(), clusterMessage); + callbackExecutorService.execute(() -> { + try { + service.onReceivedMsg(session.getRemoteServer(), clusterMessage); + } catch (Exception e) { + log.debug("[{}][{}] Failed to process cluster message: {}", session.getRemoteServer(), getType(session), clusterMessage, e); + } + }); } @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java index 9fa087e630..b2931f85e1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java @@ -16,7 +16,9 @@ package org.thingsboard.server.actors.rpc; import akka.actor.ActorRef; +import akka.actor.OneForOneStrategy; import akka.actor.Props; +import akka.actor.SupervisorStrategy; import akka.event.Logging; import akka.event.LoggingAdapter; import lombok.extern.slf4j.Slf4j; @@ -30,6 +32,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.cluster.ServerType; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.cluster.discovery.ServerInstance; +import scala.concurrent.duration.Duration; import java.util.*; @@ -39,9 +42,7 @@ import java.util.*; public class RpcManagerActor extends ContextAwareActor { private final Map sessionActors; - private final Map> pendingMsgs; - private final ServerAddress instance; private RpcManagerActor(ActorSystemContext systemContext) { @@ -63,7 +64,7 @@ public class RpcManagerActor extends ContextAwareActor { } @Override - public void onReceive(Object msg) throws Exception { + public void onReceive(Object msg) { if (msg instanceof ClusterAPIProtos.ClusterMessage) { onMsg((ClusterAPIProtos.ClusterMessage) msg); } else if (msg instanceof RpcBroadcastMsg) { @@ -163,6 +164,7 @@ public class RpcManagerActor extends ContextAwareActor { log.info("[{}] session closed. Should reconnect: {}", remoteAddress, reconnect); SessionActorInfo sessionRef = sessionActors.get(remoteAddress); if (sessionRef != null && context().sender() != null && context().sender().equals(sessionRef.actor)) { + context().stop(sessionRef.actor); sessionActors.remove(remoteAddress); pendingMsgs.remove(remoteAddress); if (reconnect) { @@ -172,9 +174,13 @@ public class RpcManagerActor extends ContextAwareActor { } private void onCreateSessionRequest(RpcSessionCreateRequestMsg msg) { - ActorRef actorRef = createSessionActor(msg); if (msg.getRemoteAddress() != null) { - register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef); + if (!sessionActors.containsKey(msg.getRemoteAddress())) { + ActorRef actorRef = createSessionActor(msg); + register(msg.getRemoteAddress(), msg.getMsgUid(), actorRef); + } + } else { + createSessionActor(msg); } } @@ -193,7 +199,8 @@ public class RpcManagerActor extends ContextAwareActor { private ActorRef createSessionActor(RpcSessionCreateRequestMsg msg) { log.info("[{}] Creating session actor.", msg.getMsgUid()); ActorRef actor = context().actorOf( - Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid())).withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME)); + Props.create(new RpcSessionActor.ActorCreator(systemContext, msg.getMsgUid())) + .withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME)); actor.tell(msg, context().self()); return actor; } @@ -210,4 +217,14 @@ public class RpcManagerActor extends ContextAwareActor { return new RpcManagerActor(context); } } + + @Override + public SupervisorStrategy supervisorStrategy() { + return strategy; + } + + private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> { + log.warn("Unknown failure", t); + return SupervisorStrategy.resume(); + }); } diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java index 86509ca08e..2ca5c3e872 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java @@ -15,12 +15,10 @@ */ package org.thingsboard.server.actors.rpc; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -38,15 +36,15 @@ import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CO /** * @author Andrew Shvayka */ +@Slf4j public class RpcSessionActor extends ContextAwareActor { - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); private final UUID sessionId; private GrpcSession session; private GrpcSessionListener listener; - public RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) { + private RpcSessionActor(ActorSystemContext systemContext, UUID sessionId) { super(systemContext); this.sessionId = sessionId; } @@ -58,7 +56,7 @@ public class RpcSessionActor extends ContextAwareActor { } @Override - public void onReceive(Object msg) throws Exception { + public void onReceive(Object msg) { if (msg instanceof ClusterAPIProtos.ClusterMessage) { tell((ClusterAPIProtos.ClusterMessage) msg); } else if (msg instanceof RpcSessionCreateRequestMsg) { @@ -67,19 +65,29 @@ public class RpcSessionActor extends ContextAwareActor { } private void tell(ClusterAPIProtos.ClusterMessage msg) { - session.sendMsg(msg); + if (session != null) { + session.sendMsg(msg); + } else { + log.trace("Failed to send message due to missing session!"); + } } @Override public void postStop() { - log.info("Closing session -> {}", session.getRemoteServer()); - session.close(); + if (session != null) { + log.info("Closing session -> {}", session.getRemoteServer()); + try { + session.close(); + } catch (RuntimeException e) { + log.trace("Failed to close session!", e); + } + } } private void initSession(RpcSessionCreateRequestMsg msg) { log.info("[{}] Initializing session", context().self()); ServerAddress remoteServer = msg.getRemoteAddress(); - listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self()); + listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self()); if (msg.getRemoteAddress() == null) { // Server session session = new GrpcSession(listener); @@ -113,7 +121,7 @@ public class RpcSessionActor extends ContextAwareActor { } @Override - public RpcSessionActor create() throws Exception { + public RpcSessionActor create() { return new RpcSessionActor(context, sessionId); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java index 1c7e2d2664..a53b88e20f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java @@ -37,7 +37,7 @@ public abstract class ContextAwareActor extends UntypedActor { } @Override - public void onReceive(Object msg) throws Exception { + public void onReceive(Object msg) { if (log.isDebugEnabled()) { log.debug("Processing msg: {}", msg); } diff --git a/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java new file mode 100644 index 0000000000..2def8b0e1e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/executors/ClusterRpcCallbackExecutorService.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.executors; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +public class ClusterRpcCallbackExecutorService extends AbstractListeningExecutor { + + @Value("${actors.cluster.grpc_callback_thread_pool_size}") + private int grpcCallbackExecutorThreadPoolSize; + + @Override + protected int getThreadPollSize() { + return grpcCallbackExecutorThreadPoolSize; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java index cc8ecee1d6..c78d7df578 100644 --- a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java +++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; import java.util.Collections; +import java.util.UUID; import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE; @@ -47,4 +48,10 @@ public class DefaultDeviceSessionCacheService implements DeviceSessionCacheServi return sessions; } + public static void main (String[] args){ + UUID uuid = UUID.fromString("d5db434e-9cd2-4903-8b3b-421b2d93664d"); + System.out.println(uuid.getMostSignificantBits()); + System.out.println(uuid.getLeastSignificantBits()); + } + } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 60ce107a27..a2294d80d0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -153,6 +153,8 @@ sql: # Actor system parameters actors: + cluster: + grpc_callback_thread_pool_size: "${ACTORS_CLUSTER_GRPC_CALLBACK_THREAD_POOL_SIZE:10}" tenant: create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}" session: