diff --git a/application/pom.xml b/application/pom.xml index ad4dd02794..e964f94373 100644 --- a/application/pom.xml +++ b/application/pom.xml @@ -57,6 +57,10 @@ linux-x86_64 + + org.thingsboard.common + actor + org.thingsboard.common util @@ -173,14 +177,6 @@ org.springframework spring-context-support - - com.typesafe.akka - akka-actor_${scala.version} - - - com.typesafe.akka - akka-slf4j_${scala.version} - org.slf4j slf4j-api diff --git a/application/src/main/conf/logback.xml b/application/src/main/conf/logback.xml index 46b42182fb..6f930bb19f 100644 --- a/application/src/main/conf/logback.xml +++ b/application/src/main/conf/logback.xml @@ -35,7 +35,6 @@ - diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index b9d1b3002d..7b9e1d0fd0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -15,9 +15,6 @@ */ package org.thingsboard.server.actors; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Scheduler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -25,8 +22,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -91,12 +86,13 @@ import java.io.StringWriter; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Component public class ActorSystemContext { - private static final String AKKA_CONF_FILE_NAME = "actor-system.conf"; protected final ObjectMapper mapper = new ObjectMapper(); @@ -260,14 +256,6 @@ public class ActorSystemContext { @Getter private long syncSessionTimeout; - @Value("${actors.queue.enabled}") - @Getter - private boolean queuePersistenceEnabled; - - @Value("${actors.queue.timeout}") - @Getter - private long queuePersistenceTimeout; - @Value("${actors.rule.chain.error_persist_frequency}") @Getter private long ruleChainErrorPersistFrequency; @@ -327,17 +315,14 @@ public class ActorSystemContext { @Getter @Setter - private ActorSystem actorSystem; + private TbActorSystem actorSystem; @Setter - private ActorRef appActor; + private TbActorRef appActor; @Getter @Setter - private ActorRef statsActor; - - @Getter - private final Config config; + private TbActorRef statsActor; @Autowired(required = false) @Getter @@ -351,14 +336,8 @@ public class ActorSystemContext { @Getter private RedisTemplate redisTemplate; - public ActorSystemContext() { - config = ConfigFactory.parseResources(AKKA_CONF_FILE_NAME).withFallback(ConfigFactory.load()); - } - - - - public Scheduler getScheduler() { - return actorSystem.scheduler(); + public ScheduledExecutorService getScheduler() { + return actorSystem.getScheduler(); } public void persistError(TenantId tenantId, EntityId entityId, String method, Exception e) { @@ -531,7 +510,18 @@ public class ActorSystemContext { return Exception.class.isInstance(error) ? (Exception) error : new Exception(error); } - public void tell(TbActorMsg tbActorMsg, ActorRef sender) { - appActor.tell(tbActorMsg, sender); + public void tell(TbActorMsg tbActorMsg) { + appActor.tell(tbActorMsg); + } + + + public void schedulePeriodicMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs, long periodInMs) { + log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); + getScheduler().scheduleWithFixedDelay(() -> ctx.tell(msg), delayInMs, periodInMs, TimeUnit.MILLISECONDS); + } + + public void scheduleMsgWithDelay(TbActorRef ctx, TbActorMsg msg, long delayInMs) { + log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); + getScheduler().schedule(() -> ctx.tell(msg), delayInMs, TimeUnit.MILLISECONDS); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/TbEntityTypeActorIdPredicate.java b/application/src/main/java/org/thingsboard/server/actors/TbEntityTypeActorIdPredicate.java new file mode 100644 index 0000000000..480b3b9ae3 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/TbEntityTypeActorIdPredicate.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.function.Predicate; + +@RequiredArgsConstructor +public class TbEntityTypeActorIdPredicate implements Predicate { + + private final EntityType entityType; + + @Override + public boolean test(TbActorId actorId) { + return actorId instanceof TbEntityActorId && testEntityId(((TbEntityActorId) actorId).getEntityId()); + } + + protected boolean testEntityId(EntityId entityId) { + return entityId.getEntityType().equals(entityType); + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index b7782a6a87..3976349de4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -15,21 +15,19 @@ */ package org.thingsboard.server.actors.app; -import akka.actor.ActorRef; -import akka.actor.LocalActorRef; -import akka.actor.OneForOneStrategy; -import akka.actor.Props; -import akka.actor.SupervisorStrategy; -import akka.actor.Terminated; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.tenant.TenantActor; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -43,38 +41,27 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; -import scala.concurrent.duration.Duration; import java.util.HashSet; import java.util.Optional; import java.util.Set; +@Slf4j public class AppActor extends ContextAwareActor { private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); private final TenantService tenantService; - private final BiMap tenantActors; private final Set deletedTenants; private boolean ruleChainsInitialized; private AppActor(ActorSystemContext systemContext) { super(systemContext); this.tenantService = systemContext.getTenantService(); - this.tenantActors = HashBiMap.create(); this.deletedTenants = new HashSet<>(); } @Override - public SupervisorStrategy supervisorStrategy() { - return strategy; - } - - @Override - public void preStart() { - } - - @Override - protected boolean process(TbActorMsg msg) { + protected boolean doProcess(TbActorMsg msg) { if (!ruleChainsInitialized) { initTenantActors(); ruleChainsInitialized = true; @@ -86,7 +73,7 @@ public class AppActor extends ContextAwareActor { case APP_INIT_MSG: break; case PARTITION_CHANGE_MSG: - broadcast(msg); + ctx.broadcastToChildren(msg); break; case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); @@ -145,19 +132,15 @@ public class AppActor extends ContextAwareActor { msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); } else { if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); + getOrCreateTenantActor(msg.getTenantId()).tell(msg); } else { msg.getTbMsg().getCallback().onSuccess(); } } } - protected void broadcast(Object msg) { - tenantActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); - } - private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { - ActorRef target = null; + TbActorRef target = null; if (SYSTEM_TENANT.equals(msg.getTenantId())) { log.warn("Message has system tenant id: {}", msg); } else { @@ -166,17 +149,13 @@ public class AppActor extends ContextAwareActor { log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); TenantId tenantId = new TenantId(msg.getEntityId().getId()); deletedTenants.add(tenantId); - ActorRef tenantActor = tenantActors.get(tenantId); - if (tenantActor != null) { - log.debug("[{}] Deleting tenant actor: {}", msg.getTenantId(), tenantActor); - context().stop(tenantActor); - } + ctx.stop(new TbEntityActorId(tenantId)); } else { target = getOrCreateTenantActor(msg.getTenantId()); } } if (target != null) { - target.tell(msg, ActorRef.noSender()); + target.tell(msg); } else { log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg); } @@ -184,7 +163,7 @@ public class AppActor extends ContextAwareActor { private void onToDeviceActorMsg(TenantAwareMsg msg) { if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender()); + getOrCreateTenantActor(msg.getTenantId()).tell(msg); } else { if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); @@ -192,49 +171,27 @@ public class AppActor extends ContextAwareActor { } } - private ActorRef getOrCreateTenantActor(TenantId tenantId) { - return tenantActors.computeIfAbsent(tenantId, k -> { - log.info("[{}] Creating tenant actor.", tenantId); - ActorRef tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId)) - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()); - context().watch(tenantActor); - log.info("[{}] Created tenant actor: {}.", tenantId, tenantActor); - return tenantActor; - }); + private TbActorRef getOrCreateTenantActor(TenantId tenantId) { + return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), + () -> DefaultActorService.TENANT_DISPATCHER_NAME, + () -> new TenantActor.ActorCreator(systemContext, tenantId)); } - @Override - protected void processTermination(Terminated message) { - ActorRef terminated = message.actor(); - if (terminated instanceof LocalActorRef) { - boolean removed = tenantActors.inverse().remove(terminated) != null; - if (removed) { - log.debug("[{}] Removed actor:", terminated); - } - } else { - throw new IllegalStateException("Remote actors are not supported!"); - } - } - - public static class ActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; + public static class ActorCreator extends ContextBasedCreator { public ActorCreator(ActorSystemContext context) { super(context); } @Override - public AppActor create() { + public TbActorId createActorId() { + return new TbEntityActorId(new TenantId(EntityId.NULL_UUID)); + } + + @Override + public TbActor createActor() { return new AppActor(context); } } - private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> { - log.warn("Unknown failure", t); - if (t instanceof RuntimeException) { - return SupervisorStrategy.restart(); - } else { - return SupervisorStrategy.stop(); - } - }); } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 256ffd0b30..9c0215fc3a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -15,9 +15,11 @@ */ package org.thingsboard.server.actors.device; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -26,6 +28,7 @@ import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeout import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; +@Slf4j public class DeviceActor extends ContextAwareActor { private final DeviceActorMessageProcessor processor; @@ -36,10 +39,11 @@ public class DeviceActor extends ContextAwareActor { } @Override - public void preStart() { + public void init(TbActorCtx ctx) { + super.init(ctx); log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); try { - processor.initSessionTimeout(context()); + processor.initSessionTimeout(ctx); log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); } catch (Exception e) { log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e); @@ -47,18 +51,13 @@ public class DeviceActor extends ContextAwareActor { } @Override - public void postStop() { - - } - - @Override - protected boolean process(TbActorMsg msg) { + protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case TRANSPORT_TO_DEVICE_ACTOR_MSG: - processor.process(context(), (TransportToDeviceActorMsgWrapper) msg); + processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg); break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: - processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg); + processor.processAttributesUpdate(ctx, (DeviceAttributesEventNotificationMsg) msg); break; case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: processor.processCredentialsUpdate(); @@ -67,10 +66,10 @@ public class DeviceActor extends ContextAwareActor { processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg); break; case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: - processor.processRpcRequest(context(), (ToDeviceRpcRequestActorMsg) msg); + processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg); break; case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG: - processor.processServerSideRpcTimeout(context(), (DeviceActorServerSideRpcTimeoutMsg) msg); + processor.processServerSideRpcTimeout(ctx, (DeviceActorServerSideRpcTimeoutMsg) msg); break; case SESSION_TIMEOUT_MSG: processor.checkSessionsTimeout(); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java index 5a77003895..3624378989 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java @@ -16,12 +16,14 @@ package org.thingsboard.server.actors.device; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; -public class DeviceActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; +public class DeviceActorCreator extends ContextBasedCreator { private final TenantId tenantId; private final DeviceId deviceId; @@ -33,7 +35,13 @@ public class DeviceActorCreator extends ContextBasedCreator { } @Override - public DeviceActor create() { + public TbActorId createActorId() { + return new TbEntityActorId(deviceId); + } + + @Override + public TbActor createActor() { return new DeviceActor(context, tenantId, deviceId); } + } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index d908258cc9..e328931c0e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.actors.device; -import akka.actor.ActorContext; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -27,6 +26,7 @@ import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.id.DeviceId; @@ -127,7 +127,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - void processRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg) { + void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) { ToDeviceRpcRequest request = msg.getMsg(); ToDeviceRpcRequestBody body = request.getBody(); ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder().setRequestId( @@ -162,13 +162,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { + private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent)); DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout); scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout()); } - void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) { + void processServerSideRpcTimeout(TbActorCtx context, DeviceActorServerSideRpcTimeoutMsg msg) { ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); if (requestMd != null) { log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); @@ -177,7 +177,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) { + private void sendPendingRequests(TbActorCtx context, UUID sessionId, SessionInfoProto sessionInfo) { SessionType sessionType = getSessionType(sessionId); if (!toDeviceRpcPendingMap.isEmpty()) { log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); @@ -198,7 +198,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sentOneWayIds.forEach(toDeviceRpcPendingMap::remove); } - private Consumer> processPendingRpc(ActorContext context, UUID sessionId, String nodeId, Set sentOneWayIds) { + private Consumer> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set sentOneWayIds) { return entry -> { ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg(); ToDeviceRpcRequestBody body = request.getBody(); @@ -212,7 +212,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { }; } - void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) { + void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) { TransportToDeviceActorMsg msg = wrapper.getMsg(); TbCallback callback = wrapper.getCallback(); if (msg.hasSessionEvent()) { @@ -239,7 +239,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { callback.onSuccess(); } - private void handleClaimDeviceMsg(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) { + private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg) { DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB())); systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs()); } @@ -252,7 +252,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { systemContext.getDeviceStateService().onDeviceDisconnect(deviceId); } - private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { + private void handleGetAttributesRequest(TbActorCtx context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { int requestId = request.getRequestId(); Futures.addCallback(getAttributesKvEntries(request), new FutureCallback>>() { @Override @@ -310,7 +310,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { return sessions.containsKey(sessionId) ? SessionType.ASYNC : SessionType.SYNC; } - void processAttributesUpdate(ActorContext context, DeviceAttributesEventNotificationMsg msg) { + void processAttributesUpdate(TbActorCtx context, DeviceAttributesEventNotificationMsg msg) { if (attributeSubscriptions.size() > 0) { boolean hasNotificationData = false; AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder(); @@ -349,7 +349,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) { + private void processRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) { UUID sessionId = getSessionId(sessionInfo); log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); @@ -362,7 +362,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { + private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); @@ -383,7 +383,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); } - private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) { + private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); @@ -433,7 +433,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } } - private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { + private void handleSessionActivity(TbActorCtx context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) { UUID sessionId = getSessionId(sessionInfoProto); SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId, id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), 0L)); @@ -612,8 +612,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .addAllSessions(sessionsList).build().toByteArray()); } - void initSessionTimeout(ActorContext context) { - schedulePeriodicMsgWithDelay(context, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout()); + void initSessionTimeout(TbActorCtx ctx) { + schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout()); } void checkSessionsTimeout() { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7c89eef1c5..8f831ada16 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.actors.ruleChain; -import akka.actor.ActorRef; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.EventLoopGroup; @@ -29,6 +28,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; @@ -62,7 +63,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; -import scala.concurrent.duration.Duration; import java.util.Collections; import java.util.Set; @@ -104,7 +104,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); } - nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null), nodeCtx.getSelfActor()); + nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); } @Override @@ -130,7 +130,7 @@ class DefaultTbContext implements TbContext { .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(tbMsg)).build(); - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure)); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure)); } @Override @@ -187,7 +187,7 @@ class DefaultTbContext implements TbContext { if (failureMessage != null) { msg.setFailureMessage(failureMessage); } - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure)); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure)); } @Override @@ -203,8 +203,8 @@ class DefaultTbContext implements TbContext { return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition(); } - private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { - mainCtx.getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, mainCtx.getActorSystem().dispatcher(), nodeCtx.getSelfActor()); + private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) { + mainCtx.scheduleMsgWithDelay(target, msg, delayInMs); } @Override @@ -213,7 +213,7 @@ class DefaultTbContext implements TbContext { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th); } nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE), - msg, th != null ? th.getMessage() : null), nodeCtx.getSelfActor()); + msg, th != null ? th.getMessage() : null)); } public void updateSelf(RuleNode self) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java index 027f4aac9d..aac7094fc2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java @@ -15,9 +15,11 @@ */ package org.thingsboard.server.actors.ruleChain; -import akka.actor.OneForOneStrategy; -import akka.actor.SupervisorStrategy; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.service.ComponentActor; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.common.data.id.RuleChainId; @@ -27,18 +29,24 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; -import scala.concurrent.duration.Duration; public class RuleChainActor extends ComponentActor { + private final RuleChain ruleChain; + private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChain ruleChain) { super(systemContext, tenantId, ruleChain.getId()); - setProcessor(new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext, - context().parent(), context().self())); + this.ruleChain = ruleChain; } @Override - protected boolean process(TbActorMsg msg) { + protected RuleChainActorMessageProcessor createProcessor(TbActorCtx ctx) { + return new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext, + ctx.getParentRef(), ctx); + } + + @Override + protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); @@ -64,7 +72,7 @@ public class RuleChainActor extends ComponentActor { + public static class ActorCreator extends ContextBasedCreator { private static final long serialVersionUID = 1L; private final TenantId tenantId; @@ -77,7 +85,12 @@ public class RuleChainActor extends ComponentActor { - logAndPersist("Unknown Failure", ActorSystemContext.toException(t)); - return SupervisorStrategy.resume(); - }); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 61c5c0da28..4c59b1fa40 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -15,12 +15,12 @@ */ package org.thingsboard.server.actors.ruleChain; -import akka.actor.ActorContext; -import akka.actor.ActorRef; -import akka.actor.Props; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.TbRelationTypes; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.ComponentMsgProcessor; import org.thingsboard.server.common.data.EntityType; @@ -62,8 +62,8 @@ import java.util.stream.Collectors; @Slf4j public class RuleChainActorMessageProcessor extends ComponentMsgProcessor { - private final ActorRef parent; - private final ActorRef self; + private final TbActorRef parent; + private final TbActorRef self; private final Map nodeActors; private final Map> nodeRoutes; private final RuleChainService service; @@ -75,7 +75,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor { log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId); RuleNodeCtx removed = nodeActors.remove(ruleNodeId); - removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED), self); + removed.getSelfActor().tell(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED)); }); initRoutes(ruleChain, ruleNodeList); @@ -145,26 +145,23 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor actorRef.tell(msg, self)); + nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tell(msg)); } - private ActorRef createRuleNodeActor(ActorContext context, RuleNode ruleNode) { - String dispatcherName = tenantId.getId().equals(EntityId.NULL_UUID) ? - DefaultActorService.SYSTEM_RULE_DISPATCHER_NAME : DefaultActorService.TENANT_RULE_DISPATCHER_NAME; - return context.actorOf( - Props.create(new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getName(), ruleNode.getId())) - .withDispatcher(dispatcherName), ruleNode.getId().toString()); + private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) { + return ctx.getOrCreateChildActor(new TbEntityActorId(ruleNode.getId()), + () -> DefaultActorService.RULE_DISPATCHER_NAME, + () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleNode.getName(), ruleNode.getId())); } private void initRoutes(RuleChain ruleChain, List ruleNodeList) { @@ -303,7 +300,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor actors; @Getter protected RuleChain rootChain; @Getter - protected ActorRef rootChainActor; + protected TbActorRef rootChainActor; public RuleChainManagerActor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext); this.tenantId = tenantId; - this.actors = HashBiMap.create(); this.ruleChainService = systemContext.getRuleChainService(); } @@ -58,46 +58,41 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) { RuleChainId ruleChainId = ruleChain.getId(); log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId()); - //TODO: remove this cast making UUIDBased subclass of EntityId an interface and vice versa. - ActorRef actorRef = getOrCreateActor(this.context(), ruleChainId, id -> ruleChain); + TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain); visit(ruleChain, actorRef); log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId()); } } - protected void visit(RuleChain entity, ActorRef actorRef) { + protected void visit(RuleChain entity, TbActorRef actorRef) { if (entity != null && entity.isRoot()) { rootChain = entity; rootChainActor = actorRef; } } - public ActorRef getOrCreateActor(akka.actor.ActorContext context, RuleChainId ruleChainId) { - return getOrCreateActor(context, ruleChainId, eId -> ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, eId)); + protected TbActorRef getOrCreateActor(RuleChainId ruleChainId) { + return getOrCreateActor(ruleChainId, eId -> ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, eId)); } - public ActorRef getOrCreateActor(akka.actor.ActorContext context, RuleChainId ruleChainId, Function provider) { - return actors.computeIfAbsent(ruleChainId, eId -> { - RuleChain ruleChain = provider.apply(eId); - return context.actorOf(Props.create(new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain)) - .withDispatcher(DefaultActorService.TENANT_RULE_DISPATCHER_NAME), eId.toString()); - }); + protected TbActorRef getOrCreateActor(RuleChainId ruleChainId, Function provider) { + return ctx.getOrCreateChildActor(new TbEntityActorId(ruleChainId), + () -> DefaultActorService.RULE_DISPATCHER_NAME, + () -> { + RuleChain ruleChain = provider.apply(ruleChainId); + return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); + }); } - protected ActorRef getEntityActorRef(EntityId entityId) { - ActorRef target = null; + protected TbActorRef getEntityActorRef(EntityId entityId) { + TbActorRef target = null; if (entityId.getEntityType() == EntityType.RULE_CHAIN) { - target = getOrCreateActor(this.context(), (RuleChainId) entityId); + target = getOrCreateActor((RuleChainId) entityId); } return target; } - protected void broadcast(Object msg) { - actors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); + protected void broadcast(TbActorMsg msg) { + ctx.broadcastToChildren(msg, new TbEntityTypeActorIdPredicate(EntityType.RULE_CHAIN)); } - - public ActorRef get(RuleChainId id) { - return actors.get(id); - } - } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java index 74316a996b..f11c087958 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActor.java @@ -15,31 +15,43 @@ */ package org.thingsboard.server.actors.ruleChain; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbEntityActorId; import org.thingsboard.server.actors.service.ComponentActor; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; +@Slf4j public class RuleNodeActor extends ComponentActor { private final String ruleChainName; private final RuleChainId ruleChainId; + private final RuleNodeId ruleNodeId; private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) { super(systemContext, tenantId, ruleNodeId); this.ruleChainName = ruleChainName; this.ruleChainId = ruleChainId; - setProcessor(new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, - context().parent(), context().self())); + this.ruleNodeId = ruleNodeId; } @Override - protected boolean process(TbActorMsg msg) { + protected RuleNodeActorMessageProcessor createProcessor(TbActorCtx ctx) { + return new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, ctx.getParentRef(), ctx); + } + + @Override + protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); @@ -93,8 +105,7 @@ public class RuleNodeActor extends ComponentActor { - private static final long serialVersionUID = 1L; + public static class ActorCreator extends ContextBasedCreator { private final TenantId tenantId; private final RuleChainId ruleChainId; @@ -111,7 +122,12 @@ public class RuleNodeActor extends ComponentActor { private final String ruleChainName; - private final ActorRef self; + private final TbActorRef self; private RuleNode ruleNode; private TbNode tbNode; private DefaultTbContext defaultCtx; RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName, RuleNodeId ruleNodeId, ActorSystemContext systemContext - , ActorRef parent, ActorRef self) { + , TbActorRef parent, TbActorRef self) { super(systemContext, tenantId, ruleNodeId); this.ruleChainName = ruleChainName; this.self = self; @@ -49,7 +49,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor> extends ContextAwareActor { private long lastPersistedErrorTs = 0L; @@ -43,15 +45,19 @@ public abstract class ComponentActor implements Creator { - - private static final long serialVersionUID = 1L; +public abstract class ContextBasedCreator implements TbActorCreator { protected final transient ActorSystemContext context; diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 7b04e82786..b02ee1a01a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -15,85 +15,116 @@ */ package org.thingsboard.server.actors.service; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.Terminated; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.DefaultTbActorSystem; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.TbActorSystem; +import org.thingsboard.server.actors.TbActorSystemSettings; import org.thingsboard.server.actors.app.AppActor; import org.thingsboard.server.actors.app.AppInitMsg; import org.thingsboard.server.actors.stats.StatsActor; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; @Service @Slf4j public class DefaultActorService implements ActorService { - private static final String ACTOR_SYSTEM_NAME = "Akka"; - public static final String APP_DISPATCHER_NAME = "app-dispatcher"; - public static final String CORE_DISPATCHER_NAME = "core-dispatcher"; - public static final String SYSTEM_RULE_DISPATCHER_NAME = "system-rule-dispatcher"; - public static final String TENANT_RULE_DISPATCHER_NAME = "rule-dispatcher"; + public static final String TENANT_DISPATCHER_NAME = "tenant-dispatcher"; + public static final String DEVICE_DISPATCHER_NAME = "device-dispatcher"; + public static final String RULE_DISPATCHER_NAME = "rule-dispatcher"; @Autowired private ActorSystemContext actorContext; - private ActorSystem system; + private TbActorSystem system; - private ActorRef appActor; + private TbActorRef appActor; + + @Value("${actors.system.throughput:5}") + private int actorThroughput; + + @Value("${actors.system.max_actor_init_attempts:10}") + private int maxActorInitAttempts; + + @Value("${actors.system.scheduler_pool_size:1}") + private int schedulerPoolSize; + + @Value("${actors.system.app_dispatcher_pool_size:1}") + private int appDispatcherSize; + + @Value("${actors.system.tenant_dispatcher_pool_size:2}") + private int tenantDispatcherSize; + + @Value("${actors.system.device_dispatcher_pool_size:4}") + private int deviceDispatcherSize; + + @Value("${actors.system.rule_dispatcher_pool_size:4}") + private int ruleDispatcherSize; @PostConstruct public void initActorSystem() { - log.info("Initializing Actor system."); + log.info("Initializing actor system."); actorContext.setActorService(this); - system = ActorSystem.create(ACTOR_SYSTEM_NAME, actorContext.getConfig()); + TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts); + system = new DefaultTbActorSystem(settings); + + system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(appDispatcherSize)); + system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(tenantDispatcherSize)); + system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(deviceDispatcherSize)); + system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(ruleDispatcherSize)); + actorContext.setActorSystem(system); - appActor = system.actorOf(Props.create(new AppActor.ActorCreator(actorContext)).withDispatcher(APP_DISPATCHER_NAME), "appActor"); + appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext)); actorContext.setAppActor(appActor); - ActorRef statsActor = system.actorOf(Props.create(new StatsActor.ActorCreator(actorContext)).withDispatcher(CORE_DISPATCHER_NAME), "statsActor"); + TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor")); actorContext.setStatsActor(statsActor); log.info("Actor system initialized."); } + private ExecutorService initDispatcherExecutor(int poolSize) { + if (poolSize == 0) { + int cores = Runtime.getRuntime().availableProcessors(); + poolSize = Math.max(1, cores / 2); + } + return Executors.newWorkStealingPool(poolSize); + } + @EventListener(ApplicationReadyEvent.class) public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { log.info("Received application ready event. Sending application init message to actor system"); - appActor.tell(new AppInitMsg(), ActorRef.noSender()); + appActor.tell(new AppInitMsg()); } @EventListener(PartitionChangeEvent.class) public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { log.info("Received partition change event."); - this.appActor.tell(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions()), ActorRef.noSender()); + this.appActor.tell(new PartitionChangeMsg(partitionChangeEvent.getServiceQueueKey(), partitionChangeEvent.getPartitions())); } @PreDestroy public void stopActorSystem() { - Future status = system.terminate(); - try { - Terminated terminated = Await.result(status, Duration.Inf()); - log.info("Actor system terminated: {}", terminated); - } catch (Exception e) { - log.error("Failed to terminate actor system.", e); + if (system != null) { + log.info("Stopping actor system."); + system.stop(); + log.info("Actor system stopped."); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java index 064d5f28c8..dc78416c06 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java @@ -15,18 +15,13 @@ */ package org.thingsboard.server.actors.shared; -import akka.actor.ActorContext; -import akka.actor.ActorRef; -import akka.actor.Scheduler; -import akka.event.LoggingAdapter; import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.AllArgsConstructor; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.duration.Duration; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.common.msg.TbActorMsg; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j @@ -40,31 +35,16 @@ public abstract class AbstractContextAwareMsgProcessor { this.systemContext = systemContext; } - private Scheduler getScheduler() { + private ScheduledExecutorService getScheduler() { return systemContext.getScheduler(); } - private ExecutionContextExecutor getSystemDispatcher() { - return systemContext.getActorSystem().dispatcher(); + protected void schedulePeriodicMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs, long periodInMs) { + systemContext.schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs); } - protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs) { - schedulePeriodicMsgWithDelay(msg, delayInMs, periodInMs, ctx.self()); + protected void scheduleMsgWithDelay(TbActorCtx ctx, TbActorMsg msg, long delayInMs) { + systemContext.scheduleMsgWithDelay(ctx, msg, delayInMs); } - private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) { - log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); - getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); - } - - protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs) { - scheduleMsgWithDelay(msg, delayInMs, ctx.self()); - } - - private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { - log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); - getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); - } - - } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java index 45779a9f89..8ced2be204 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java @@ -15,16 +15,15 @@ */ package org.thingsboard.server.actors.shared; -import akka.actor.ActorContext; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.stats.StatsPersistTick; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleState; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; -import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeException; @Slf4j @@ -42,38 +41,38 @@ public abstract class ComponentMsgProcessor extends Abstract public abstract String getComponentName(); - public abstract void start(ActorContext context) throws Exception; + public abstract void start(TbActorCtx context) throws Exception; - public abstract void stop(ActorContext context) throws Exception; + public abstract void stop(TbActorCtx context) throws Exception; public abstract void onPartitionChangeMsg(PartitionChangeMsg msg) throws Exception; - public void onCreated(ActorContext context) throws Exception { + public void onCreated(TbActorCtx context) throws Exception { start(context); } - public void onUpdate(ActorContext context) throws Exception { + public void onUpdate(TbActorCtx context) throws Exception { restart(context); } - public void onActivate(ActorContext context) throws Exception { + public void onActivate(TbActorCtx context) throws Exception { restart(context); } - public void onSuspend(ActorContext context) throws Exception { + public void onSuspend(TbActorCtx context) throws Exception { stop(context); } - public void onStop(ActorContext context) throws Exception { + public void onStop(TbActorCtx context) throws Exception { stop(context); } - private void restart(ActorContext context) throws Exception { + private void restart(TbActorCtx context) throws Exception { stop(context); start(context); } - public void scheduleStatsPersistTick(ActorContext context, long statsPersistFrequency) { + public void scheduleStatsPersistTick(TbActorCtx context, long statsPersistFrequency) { schedulePeriodicMsgWithDelay(context, new StatsPersistTick(), statsPersistFrequency, statsPersistFrequency); } diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java index 717dcecc19..1718305cde 100644 --- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java @@ -19,10 +19,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbStringActorId; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Event; +import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; @Slf4j @@ -35,24 +40,17 @@ public class StatsActor extends ContextAwareActor { } @Override - protected boolean process(TbActorMsg msg) { - //TODO Move everything here, to work with TbActorMsg\ - return false; - } - - @Override - public void onReceive(Object msg) { + protected boolean doProcess(TbActorMsg msg) { log.debug("Received message: {}", msg); - if (msg instanceof StatsPersistMsg) { - try { - onStatsPersistMsg((StatsPersistMsg) msg); - } catch (Exception e) { - log.warn("Failed to persist statistics: {}", msg, e); - } + if (msg.getMsgType().equals(MsgType.STATS_PERSIST_MSG)) { + onStatsPersistMsg((StatsPersistMsg) msg); + return true; + } else { + return false; } } - public void onStatsPersistMsg(StatsPersistMsg msg) throws Exception { + public void onStatsPersistMsg(StatsPersistMsg msg) { Event event = new Event(); event.setEntityId(msg.getEntityId()); event.setTenantId(msg.getTenantId()); @@ -65,15 +63,21 @@ public class StatsActor extends ContextAwareActor { return mapper.createObjectNode().put("server", serviceId).put("messagesProcessed", messagesProcessed).put("errorsOccurred", errorsOccurred); } - public static class ActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; + public static class ActorCreator extends ContextBasedCreator { + private final String actorId; - public ActorCreator(ActorSystemContext context) { + public ActorCreator(ActorSystemContext context, String actorId) { super(context); + this.actorId = actorId; } @Override - public StatsActor create() { + public TbActorId createActorId() { + return new TbStringActorId(actorId); + } + + @Override + public TbActor createActor() { return new StatsActor(context); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java index a9e2583878..0d946f46ff 100644 --- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java +++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsPersistMsg.java @@ -20,13 +20,21 @@ import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorMsg; @AllArgsConstructor @Getter @ToString -public final class StatsPersistMsg { +public final class StatsPersistMsg implements TbActorMsg { + private long messagesProcessed; private long errorsOccurred; private TenantId tenantId; private EntityId entityId; + + @Override + public MsgType getMsgType() { + return MsgType.STATS_PERSIST_MSG; + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 3cee82127c..3741fb564e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -15,16 +15,15 @@ */ package org.thingsboard.server.actors.tenant; -import akka.actor.ActorInitializationException; -import akka.actor.ActorRef; -import akka.actor.LocalActorRef; -import akka.actor.OneForOneStrategy; -import akka.actor.Props; -import akka.actor.SupervisorStrategy; -import akka.actor.Terminated; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbActorNotRegisteredException; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.TbEntityActorId; +import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate; import org.thingsboard.server.actors.device.DeviceActorCreator; import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -32,6 +31,7 @@ import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleChain; @@ -45,32 +45,25 @@ import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.ServiceType; -import scala.concurrent.duration.Duration; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; +@Slf4j public class TenantActor extends RuleChainManagerActor { - private final BiMap deviceActors; private boolean isRuleEngineForCurrentTenant; private boolean isCore; private TenantActor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext, tenantId); - this.deviceActors = HashBiMap.create(); - } - - @Override - public SupervisorStrategy supervisorStrategy() { - return strategy; } boolean cantFindTenant = false; @Override - public void preStart() { + public void init(TbActorCtx ctx) { + super.init(ctx); log.info("[{}] Starting tenant actor.", tenantId); try { Tenant tenant = systemContext.getTenantService().findTenantById(tenantId); @@ -104,12 +97,12 @@ public class TenantActor extends RuleChainManagerActor { } @Override - public void postStop() { + public void destroy() { log.info("[{}] Stopping tenant actor.", tenantId); } @Override - protected boolean process(TbActorMsg msg) { + protected boolean doProcess(TbActorMsg msg) { if (cantFindTenant) { log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg); if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) { @@ -126,13 +119,13 @@ public class TenantActor extends RuleChainManagerActor { //To Rule Chain Actors broadcast(msg); } else if (ServiceType.TB_CORE.equals(serviceType)) { - //To Device Actors - List repartitionedDevices = - deviceActors.keySet().stream().filter(deviceId -> !isMyPartition(deviceId)).collect(Collectors.toList()); - for (DeviceId deviceId : repartitionedDevices) { - ActorRef deviceActor = deviceActors.remove(deviceId); - context().stop(deviceActor); - } + List deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) { + @Override + protected boolean testEntityId(EntityId entityId) { + return super.testEntityId(entityId) && !isMyPartition(entityId); + } + }); + deviceActorIds.forEach(id -> ctx.stop(id)); } break; case COMPONENT_LIFE_CYCLE_MSG: @@ -158,8 +151,8 @@ public class TenantActor extends RuleChainManagerActor { return true; } - private boolean isMyPartition(DeviceId deviceId) { - return systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId).isMyPartition(); + private boolean isMyPartition(EntityId entityId) { + return systemContext.resolve(ServiceType.TB_CORE, tenantId, entityId).isMyPartition(); } private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { @@ -170,16 +163,15 @@ public class TenantActor extends RuleChainManagerActor { TbMsg tbMsg = msg.getTbMsg(); if (tbMsg.getRuleChainId() == null) { if (getRootChainActor() != null) { - getRootChainActor().tell(msg, self()); + getRootChainActor().tell(msg); } else { tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!")); log.info("[{}] No Root Chain: {}", tenantId, msg); } } else { - ActorRef ruleChainActor = get(tbMsg.getRuleChainId()); - if (ruleChainActor != null) { - ruleChainActor.tell(msg, self()); - } else { + try { + ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg); + } catch (TbActorNotRegisteredException ex) { log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId()); //TODO: 3.1 Log it to dead letters queue; tbMsg.getCallback().onSuccess(); @@ -188,61 +180,39 @@ public class TenantActor extends RuleChainManagerActor { } private void onRuleChainMsg(RuleChainAwareMsg msg) { - getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self()); + getOrCreateActor(msg.getRuleChainId()).tell(msg); } private void onToDeviceActorMsg(DeviceAwareMsg msg) { if (!isCore) { log.warn("RECEIVED INVALID MESSAGE: {}", msg); } - getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender()); + getOrCreateDeviceActor(msg.getDeviceId()).tell(msg); } private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { if (isRuleEngineForCurrentTenant) { - ActorRef target = getEntityActorRef(msg.getEntityId()); + TbActorRef target = getEntityActorRef(msg.getEntityId()); if (target != null) { if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { RuleChain ruleChain = systemContext.getRuleChainService(). findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId())); visit(ruleChain, target); } - target.tell(msg, ActorRef.noSender()); + target.tell(msg); } else { log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg); } } } - private ActorRef getOrCreateDeviceActor(DeviceId deviceId) { - return deviceActors.computeIfAbsent(deviceId, k -> { - log.debug("[{}][{}] Creating device actor.", tenantId, deviceId); - ActorRef deviceActor = context().actorOf(Props.create(new DeviceActorCreator(systemContext, tenantId, deviceId)) - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME) - , deviceId.toString()); - context().watch(deviceActor); - log.debug("[{}][{}] Created device actor: {}.", tenantId, deviceId, deviceActor); - return deviceActor; - }); + private TbActorRef getOrCreateDeviceActor(DeviceId deviceId) { + return ctx.getOrCreateChildActor(new TbEntityActorId(deviceId), + () -> DefaultActorService.DEVICE_DISPATCHER_NAME, + () -> new DeviceActorCreator(systemContext, tenantId, deviceId)); } - @Override - protected void processTermination(Terminated message) { - ActorRef terminated = message.actor(); - if (terminated instanceof LocalActorRef) { - boolean removed = deviceActors.inverse().remove(terminated) != null; - if (removed) { - log.debug("[{}] Removed actor:", terminated); - } else { - log.debug("Removed actor was not found in the device map!"); - } - } else { - throw new IllegalStateException("Remote actors are not supported!"); - } - } - - public static class ActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; + public static class ActorCreator extends ContextBasedCreator { private final TenantId tenantId; @@ -252,18 +222,14 @@ public class TenantActor extends RuleChainManagerActor { } @Override - public TenantActor create() { + public TbActorId createActorId() { + return new TbEntityActorId(tenantId); + } + + @Override + public TbActor createActor() { return new TenantActor(context, tenantId); } } - private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> { - log.warn("[{}] Unknown failure", tenantId, t); - if (t instanceof ActorInitializationException) { - return SupervisorStrategy.stop(); - } else { - return SupervisorStrategy.resume(); - } - }); - } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index b7f6bd3dd2..b7042b2518 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.queue; -import akka.actor.ActorRef; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; @@ -150,7 +149,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get(), ActorRef.noSender()); + actorContext.tell(actorMsg.get()); } callback.onSuccess(); } @@ -279,7 +278,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get(), ActorRef.noSender()); + actorContext.tell(actorMsg.get()); } callback.onSuccess(); } else if (nfMsg.hasFromDeviceRpcResponse()) { @@ -261,7 +259,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage()); - actorContext.tell(msg, ActorRef.noSender()); + actorContext.tell(msg); } @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}") diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index f6e5eb8b43..3ac6fea2f7 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.rpc; -import akka.actor.ActorRef; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -122,7 +121,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { log.trace("[{}][{}] Processing local rpc call to device actor [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToDeviceRpcRequests.put(requestId, rpcMsg); - actorContext.tell(rpcMsg, ActorRef.noSender()); + actorContext.tell(rpcMsg); scheduleToDeviceTimeout(request, requestId); } diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf deleted file mode 100644 index 5a20c58fc7..0000000000 --- a/application/src/main/resources/actor-system.conf +++ /dev/null @@ -1,139 +0,0 @@ -# -# Copyright © 2016-2020 The Thingsboard Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -akka { - # JVM shutdown, System.exit(-1), in case of a fatal error, - # such as OutOfMemoryError - jvm-exit-on-fatal-error = off - loglevel = "INFO" - loggers = ["akka.event.slf4j.Slf4jLogger"] -} - -# This dispatcher is used for app -app-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 1 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 1 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 1.0 - } - # How long time the dispatcher will wait for new actors until it shuts down - shutdown-timeout = 1s - - # Throughput defines the number of messages that are processed in a batch - # before the thread is returned to the pool. Set to 1 for as fair as possible. - throughput = 5 -} - -# This dispatcher is used for rpc actors -rpc-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 8 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 0.5 - } - # How long time the dispatcher will wait for new actors until it shuts down - shutdown-timeout = 1s - - # Throughput defines the number of messages that are processed in a batch - # before the thread is returned to the pool. Set to 1 for as fair as possible. - throughput = 5 -} - -# This dispatcher is used for auth -core-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 12 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 0.25 - } - # How long time the dispatcher will wait for new actors until it shuts down - shutdown-timeout = 1s - - # Throughput defines the number of messages that are processed in a batch - # before the thread is returned to the pool. Set to 1 for as fair as possible. - throughput = 5 -} - -# This dispatcher is used for system rule chains and rule node actors -system-rule-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 12 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 0.25 - } - # How long time the dispatcher will wait for new actors until it shuts down - shutdown-timeout = 1s - - # Throughput defines the number of messages that are processed in a batch - # before the thread is returned to the pool. Set to 1 for as fair as possible. - throughput = 5 -} - -# This dispatcher is used for tenant rule chains and rule node actors -rule-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 12 - - # The parallelism factor is used to determine thread pool size using the - # following formula: ceil(available processors * factor). Resulting size - # is then bounded by the parallelism-min and parallelism-max values. - parallelism-factor = 0.25 - } - # How long time the dispatcher will wait for new actors until it shuts down - shutdown-timeout = 1s - - # Throughput defines the number of messages that are processed in a batch - # before the thread is returned to the pool. Set to 1 for as fair as possible. - throughput = 5 -} diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index e25fb72ccb..01aec9ceb9 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -26,7 +26,6 @@ - diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index cfd73a2bf4..90b84dfbad 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -279,6 +279,14 @@ sql: # Actor system parameters actors: + system: + throughput: "${ACTORS_SYSTEM_THROUGHPUT:5}" + scheduler_pool_size: "${ACTORS_SYSTEM_SCHEDULER_POOL_SIZE:1}" + max_actor_init_attempts: "${ACTORS_SYSTEM_MAX_ACTOR_INIT_ATTEMPTS:10}" + app_dispatcher_pool_size: "${ACTORS_SYSTEM_APP_DISPATCHER_POOL_SIZE:1}" + tenant_dispatcher_pool_size: "${ACTORS_SYSTEM_TENANT_DISPATCHER_POOL_SIZE:2}" + device_dispatcher_pool_size: "${ACTORS_SYSTEM_DEVICE_DISPATCHER_POOL_SIZE:4}" + rule_dispatcher_pool_size: "${ACTORS_SYSTEM_RULE_DISPATCHER_POOL_SIZE:4}" tenant: create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}" session: @@ -316,11 +324,6 @@ actors: enabled: "${ACTORS_STATISTICS_ENABLED:true}" js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}" persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}" - queue: - # Enable/disable persistence of un-processed messages to the queue - enabled: "${ACTORS_QUEUE_ENABLED:true}" - # Maximum allowed timeout for persistence into the queue - timeout: "${ACTORS_QUEUE_PERSISTENCE_TIMEOUT:30000}" cache: # caffeine or redis diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index b23f90a8d6..86e3f0e635 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.rules.flow; -import akka.actor.ActorRef; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Assert; @@ -25,7 +24,6 @@ import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.*; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -35,7 +33,6 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.TbMsgCallback; @@ -150,7 +147,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); // Pushing Message to the system - actorSystem.tell(qMsg, ActorRef.noSender()); + actorSystem.tell(qMsg); Mockito.verify(tbMsgCallback, Mockito.timeout(10000)).onSuccess(); PageData eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); @@ -262,7 +259,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); // Pushing Message to the system - actorSystem.tell(qMsg, ActorRef.noSender()); + actorSystem.tell(qMsg); Mockito.verify(tbMsgCallback, Mockito.timeout(10000)).onSuccess(); diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index 85219c3eef..4ff15fd1d6 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.rules.lifecycle; -import akka.actor.ActorRef; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Assert; @@ -25,7 +24,6 @@ import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Event; @@ -39,7 +37,6 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.TbMsgCallback; @@ -141,7 +138,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); // Pushing Message to the system - actorSystem.tell(qMsg, ActorRef.noSender()); + actorSystem.tell(qMsg); Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); diff --git a/application/src/test/resources/logback.xml b/application/src/test/resources/logback.xml index 47dacce343..4f083906e5 100644 --- a/application/src/test/resources/logback.xml +++ b/application/src/test/resources/logback.xml @@ -13,8 +13,6 @@ - - diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/AbstractTbActor.java b/common/actor/src/main/java/org/thingsboard/server/actors/AbstractTbActor.java new file mode 100644 index 0000000000..2179070dd2 --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/AbstractTbActor.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +import lombok.Getter; + +public abstract class AbstractTbActor implements TbActor { + + @Getter + protected TbActorCtx ctx; + + @Override + public void init(TbActorCtx ctx) { + this.ctx = ctx; + } + + @Override + public TbActorRef getActorRef() { + return ctx; + } +} diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java index 7d89abd3bb..f291c8e804 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java @@ -21,13 +21,19 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.msg.TbActorMsg; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.stream.Collectors; @Slf4j @Data @@ -36,6 +42,8 @@ public class DefaultTbActorSystem implements TbActorSystem { private final ConcurrentMap dispatchers = new ConcurrentHashMap<>(); private final ConcurrentMap actors = new ConcurrentHashMap<>(); private final ConcurrentMap actorCreationLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap> parentChildMap = new ConcurrentHashMap<>(); + @Getter private final TbActorSystemSettings settings; @Getter @@ -65,16 +73,21 @@ public class DefaultTbActorSystem implements TbActorSystem { } @Override - public TbActorId createRootActor(String dispatcherId, TbActorCreator creator) { + public TbActorRef getActor(TbActorId actorId) { + return actors.get(actorId); + } + + @Override + public TbActorRef createRootActor(String dispatcherId, TbActorCreator creator) { return createActor(dispatcherId, creator, null); } @Override - public TbActorId createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { + public TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { return createActor(dispatcherId, creator, parent); } - private TbActorId createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { + private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) { Dispatcher dispatcher = dispatchers.get(dispatcherId); if (dispatcher == null) { log.warn("Dispatcher with id [{}] is not registered!", dispatcherId); @@ -93,9 +106,20 @@ public class DefaultTbActorSystem implements TbActorSystem { if (actorMailbox == null) { log.debug("Creating actor with id [{}]!", actorId); TbActor actor = creator.createActor(); - TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parent, actor, dispatcher); + TbActorRef parentRef = null; + if (parent != null) { + parentRef = getActor(parent); + if (parentRef == null) { + throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!"); + } + } + TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher); actors.put(actorId, mailbox); mailbox.initActor(); + actorMailbox = mailbox; + if (parent != null) { + parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId); + } } else { log.debug("Actor with id [{}] is already registered!", actorId); } @@ -104,7 +128,12 @@ public class DefaultTbActorSystem implements TbActorSystem { actorCreationLocks.remove(actorId); } } - return actorId; + return actorMailbox; + } + + @Override + public void tell(TbActorRef target, TbActorMsg actorMsg) { + target.tell(actorMsg); } @Override @@ -116,8 +145,42 @@ public class DefaultTbActorSystem implements TbActorSystem { mailbox.enqueue(actorMsg); } + @Override + public void broadcastToChildren(TbActorId parent, TbActorMsg msg) { + broadcastToChildren(parent, id -> true, msg); + } + + @Override + public void broadcastToChildren(TbActorId parent, Predicate childFilter, TbActorMsg msg) { + Set children = parentChildMap.get(parent); + if (children != null) { + children.stream().filter(childFilter).forEach(id -> tell(id, msg)); + } + } + + @Override + public List filterChildren(TbActorId parent, Predicate childFilter) { + Set children = parentChildMap.get(parent); + if (children != null) { + return children.stream().filter(childFilter).collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + @Override + public void stop(TbActorRef actorRef) { + stop(actorRef.getActorId()); + } + @Override public void stop(TbActorId actorId) { + Set children = parentChildMap.remove(actorId); + if (children != null) { + for (TbActorId child : children) { + stop(child); + } + } TbActorMailbox mailbox = actors.remove(actorId); if (mailbox != null) { mailbox.destroy(); @@ -126,7 +189,17 @@ public class DefaultTbActorSystem implements TbActorSystem { @Override public void stop() { - dispatchers.values().forEach(dispatcher -> dispatcher.getExecutor().shutdownNow()); + dispatchers.values().forEach(dispatcher -> { + dispatcher.getExecutor().shutdown(); + try { + dispatcher.getExecutor().awaitTermination(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("[{}] Failed to stop dispatcher", dispatcher.getDispatcherId(), e); + } + }); + if (scheduler != null) { + scheduler.shutdownNow(); + } actors.clear(); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java index 55419582bc..5fe1206a53 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActor.java @@ -19,11 +19,15 @@ import org.thingsboard.server.common.msg.TbActorMsg; public interface TbActor { - void init(); + boolean process(TbActorMsg msg); - boolean process(TbActorCtx ctx, TbActorMsg msg); + TbActorRef getActorRef(); - void destroy(); + default void init(TbActorCtx ctx) { + } + + default void destroy() { + } default InitFailureStrategy onInitFailure(int attempt, Throwable t) { return InitFailureStrategy.retryWithDelay(5000); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java index 7dd90c3478..d21eea1a57 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java @@ -17,12 +17,25 @@ package org.thingsboard.server.actors; import org.thingsboard.server.common.msg.TbActorMsg; -public interface TbActorCtx { +import java.util.List; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public interface TbActorCtx extends TbActorRef { TbActorId getSelf(); - TbActorId getParent(); + TbActorRef getParentRef(); - void tell(TbActorId target, TbActorMsg actorMsg); + void tell(TbActorId target, TbActorMsg msg); + void stop(TbActorId target); + + TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcher, Supplier creator); + + void broadcastToChildren(TbActorMsg msg); + + void broadcastToChildren(TbActorMsg msg, Predicate childFilter); + + List filterChildren(Predicate childFilter); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorId.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorId.java index 91f4363348..773ee2ddfb 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorId.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorId.java @@ -15,33 +15,6 @@ */ package org.thingsboard.server.actors; -import org.thingsboard.server.common.data.id.EntityId; +public interface TbActorId { -import java.util.Objects; - -public class TbActorId { - - private final EntityId entityId; - - public TbActorId(EntityId entityId) { - this.entityId = entityId; - } - - @Override - public String toString() { - return entityId.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - TbActorId actorId = (TbActorId) o; - return entityId.equals(actorId.entityId); - } - - @Override - public int hashCode() { - return Objects.hash(entityId); - } } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index cbd3f3903f..3931fa2535 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -19,9 +19,12 @@ import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.TbActorMsg; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.function.Supplier; @Slf4j @Data @@ -35,7 +38,7 @@ public final class TbActorMailbox implements TbActorCtx { private final TbActorSystem system; private final TbActorSystemSettings settings; private final TbActorId selfId; - private final TbActorId parentId; + private final TbActorRef parentRef; private final TbActor actor; private final Dispatcher dispatcher; private final ConcurrentLinkedQueue msgs = new ConcurrentLinkedQueue<>(); @@ -47,11 +50,12 @@ public final class TbActorMailbox implements TbActorCtx { dispatcher.getExecutor().execute(() -> tryInit(1)); } + private void tryInit(int attempt) { try { log.debug("[{}] Trying to init actor, attempt: {}", selfId, attempt); if (!destroyInProgress.get()) { - actor.init(); + actor.init(this); if (!destroyInProgress.get()) { ready.set(READY); tryProcessQueue(false); @@ -94,7 +98,7 @@ public final class TbActorMailbox implements TbActorCtx { if (msg != null) { try { log.debug("[{}] Going to process message: {}", selfId, msg); - actor.process(this, msg); + actor.process(msg); } catch (Throwable t) { log.debug("[{}] Failed to process message: {}", selfId, msg, t); ProcessFailureStrategy strategy = actor.onProcessFailure(t); @@ -121,13 +125,38 @@ public final class TbActorMailbox implements TbActorCtx { } @Override - public TbActorId getParent() { - return parentId; + public void tell(TbActorId target, TbActorMsg actorMsg) { + system.tell(target, actorMsg); } @Override - public void tell(TbActorId target, TbActorMsg actorMsg) { - system.tell(target, actorMsg); + public void broadcastToChildren(TbActorMsg msg) { + system.broadcastToChildren(selfId, msg); + } + + @Override + public void broadcastToChildren(TbActorMsg msg, Predicate childFilter) { + system.broadcastToChildren(selfId, childFilter, msg); + } + + @Override + public List filterChildren(Predicate childFilter) { + return system.filterChildren(selfId, childFilter); + } + + @Override + public void stop(TbActorId target) { + system.stop(target); + } + + @Override + public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcher, Supplier creator) { + TbActorRef actorRef = system.getActor(actorId); + if (actorRef == null) { + return system.createChildActor(dispatcher.get(), creator.get(), selfId); + } else { + return actorRef; + } } public void destroy() { @@ -141,4 +170,14 @@ public final class TbActorMailbox implements TbActorCtx { } }); } + + @Override + public TbActorId getActorId() { + return selfId; + } + + @Override + public void tell(TbActorMsg actorMsg) { + enqueue(actorMsg); + } } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java new file mode 100644 index 0000000000..b3bc983518 --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorRef.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +import org.thingsboard.server.common.msg.TbActorMsg; + +public interface TbActorRef { + + TbActorId getActorId(); + + void tell(TbActorMsg actorMsg); + +} diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java index f2f08fb06a..2a5df1c861 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorSystem.java @@ -17,8 +17,10 @@ package org.thingsboard.server.actors; import org.thingsboard.server.common.msg.TbActorMsg; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; public interface TbActorSystem { @@ -28,14 +30,25 @@ public interface TbActorSystem { void destroyDispatcher(String dispatcherId); - TbActorId createRootActor(String dispatcherId, TbActorCreator creator); + TbActorRef getActor(TbActorId actorId); - TbActorId createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); + TbActorRef createRootActor(String dispatcherId, TbActorCreator creator); + + TbActorRef createChildActor(String dispatcherId, TbActorCreator creator, TbActorId parent); + + void tell(TbActorRef target, TbActorMsg actorMsg); void tell(TbActorId target, TbActorMsg actorMsg); + void stop(TbActorRef actorRef); + void stop(TbActorId actorId); void stop(); + void broadcastToChildren(TbActorId parent, TbActorMsg msg); + + void broadcastToChildren(TbActorId parent, Predicate childFilter, TbActorMsg msg); + + List filterChildren(TbActorId parent, Predicate childFilter); } diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbEntityActorId.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbEntityActorId.java new file mode 100644 index 0000000000..be3a1fc6d8 --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbEntityActorId.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +import lombok.Getter; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.Objects; + +public class TbEntityActorId implements TbActorId { + + @Getter + private final EntityId entityId; + + public TbEntityActorId(EntityId entityId) { + this.entityId = entityId; + } + + @Override + public String toString() { + return entityId.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TbEntityActorId that = (TbEntityActorId) o; + return entityId.equals(that.entityId); + } + + @Override + public int hashCode() { + return Objects.hash(entityId); + } +} diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbStringActorId.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbStringActorId.java new file mode 100644 index 0000000000..2c2f3e2ba1 --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbStringActorId.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +import java.util.Objects; + +public class TbStringActorId implements TbActorId { + + private final String id; + + public TbStringActorId(String id) { + this.id = id; + } + + @Override + public String toString() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TbStringActorId that = (TbStringActorId) o; + return id.equals(that.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } +} diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java index 2f7a786fe3..d2d41d5b79 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/ActorSystemTest.java @@ -81,10 +81,10 @@ public class ActorSystemTest { ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); - TbActorId actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( - new TbActorId(new DeviceId(UUID.randomUUID())), testCtx1)); - TbActorId actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( - new TbActorId(new DeviceId(UUID.randomUUID())), testCtx2)); + TbActorRef actorId1 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( + new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx1)); + TbActorRef actorId2 = actorSystem.createRootActor(ROOT_DISPATCHER, new SlowInitActor.SlowInitActorCreator( + new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx2)); actorSystem.tell(actorId1, new IntTbActorMsg(42)); actorSystem.tell(actorId2, new IntTbActorMsg(42)); @@ -98,7 +98,7 @@ public class ActorSystemTest { public void testOneActorCreated() throws InterruptedException { ActorTestCtx testCtx1 = getActorTestCtx(1); ActorTestCtx testCtx2 = getActorTestCtx(1); - TbActorId actorId = new TbActorId(new DeviceId(UUID.randomUUID())); + TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx1))); submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx2))); @@ -112,7 +112,7 @@ public class ActorSystemTest { @Test public void testActorCreatorCalledOnce() throws InterruptedException { ActorTestCtx testCtx = getActorTestCtx(1); - TbActorId actorId = new TbActorId(new DeviceId(UUID.randomUUID())); + TbActorId actorId = new TbEntityActorId(new DeviceId(UUID.randomUUID())); for(int i =0; i < 1000; i++) { submitPool.submit(() -> actorSystem.createRootActor(ROOT_DISPATCHER, new SlowCreateActor.SlowCreateActorCreator(actorId, testCtx))); } @@ -138,12 +138,12 @@ public class ActorSystemTest { List testCtxes = new ArrayList<>(); - List actorIds = new ArrayList<>(); + List actorRefs = new ArrayList<>(); for (int actorIdx = 0; actorIdx < actorsCount; actorIdx++) { ActorTestCtx testCtx = getActorTestCtx(msgNumber); - actorIds.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( - new TbActorId(new DeviceId(UUID.randomUUID())), testCtx))); + actorRefs.add(actorSystem.createRootActor(ROOT_DISPATCHER, new TestRootActor.TestRootActorCreator( + new TbEntityActorId(new DeviceId(UUID.randomUUID())), testCtx))); testCtxes.add(testCtx); } @@ -151,7 +151,7 @@ public class ActorSystemTest { for (int i = 0; i < msgNumber; i++) { int tmp = randomIntegers[i]; - submitPool.execute(() -> actorIds.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp)))); + submitPool.execute(() -> actorRefs.forEach(actorId -> actorSystem.tell(actorId, new IntTbActorMsg(tmp)))); } log.info("Submitted all messages"); diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/SlowInitActor.java b/common/actor/src/test/java/org/thingsboard/server/actors/SlowInitActor.java index 4cb59be058..e9ff1fd665 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/SlowInitActor.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/SlowInitActor.java @@ -25,13 +25,13 @@ public class SlowInitActor extends TestRootActor { } @Override - public void init() { + public void init(TbActorCtx ctx) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } - super.init(); + super.init(ctx); } public static class SlowInitActorCreator implements TbActorCreator { diff --git a/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java b/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java index 54bf044aa8..a58fa12ef3 100644 --- a/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java +++ b/common/actor/src/test/java/org/thingsboard/server/actors/TestRootActor.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.TbActorMsg; @Slf4j -public class TestRootActor implements TbActor { +public class TestRootActor extends AbstractTbActor { @Getter private final TbActorId actorId; @@ -37,12 +37,13 @@ public class TestRootActor implements TbActor { } @Override - public void init() { + public void init(TbActorCtx ctx) { + super.init(ctx); initialized = true; } @Override - public boolean process(TbActorCtx ctx, TbActorMsg msg) { + public boolean process(TbActorMsg msg) { if (initialized) { int value = ((IntTbActorMsg) msg).getValue(); sum += value; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 0345355aa3..d7f0df9838 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -21,7 +21,6 @@ import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; /** * Created by ashvayka on 15.03.18. */ -//TODO: add all "See" references public enum MsgType { /** @@ -97,6 +96,7 @@ public enum MsgType { STATS_PERSIST_TICK_MSG, + STATS_PERSIST_MSG, /** * Message that is sent by TransportRuleEngineService to Device Actor. Represents messages from the device itself. diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/DeviceAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/DeviceAwareMsg.java index ec1028d239..c812711e18 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/DeviceAwareMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/DeviceAwareMsg.java @@ -16,8 +16,9 @@ package org.thingsboard.server.common.msg.aware; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.msg.TbActorMsg; -public interface DeviceAwareMsg { +public interface DeviceAwareMsg extends TbActorMsg { DeviceId getDeviceId(); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java index c9a94b5a3f..a6bacb3594 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/RuleChainAwareMsg.java @@ -16,8 +16,9 @@ package org.thingsboard.server.common.msg.aware; import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.msg.TbActorMsg; -public interface RuleChainAwareMsg { +public interface RuleChainAwareMsg extends TbActorMsg { RuleChainId getRuleChainId(); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/TenantAwareMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/TenantAwareMsg.java index fb1fbb821f..f0ccaa65d8 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/aware/TenantAwareMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/aware/TenantAwareMsg.java @@ -16,8 +16,9 @@ package org.thingsboard.server.common.msg.aware; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbActorMsg; -public interface TenantAwareMsg { +public interface TenantAwareMsg extends TbActorMsg { TenantId getTenantId(); diff --git a/msa/tb/docker/logback.xml b/msa/tb/docker/logback.xml index 256e78436f..51a9c21f3b 100644 --- a/msa/tb/docker/logback.xml +++ b/msa/tb/docker/logback.xml @@ -41,7 +41,6 @@ - diff --git a/packaging/java/scripts/install/logback.xml b/packaging/java/scripts/install/logback.xml index 054ad42b94..d0908584c5 100644 --- a/packaging/java/scripts/install/logback.xml +++ b/packaging/java/scripts/install/logback.xml @@ -57,7 +57,6 @@ - diff --git a/pom.xml b/pom.xml index 2b4471f493..5805cd5694 100755 --- a/pom.xml +++ b/pom.xml @@ -63,8 +63,6 @@ 2.10.2 2.10.2 2.2.6 - 2.13 - 2.6.3 1.0.2 2.6.2 2.3.30 @@ -783,6 +781,11 @@ util ${project.version} + + org.thingsboard.common + actor + ${project.version} + org.thingsboard.common dao-api @@ -1097,16 +1100,6 @@ - - com.typesafe.akka - akka-actor_${scala.version} - ${akka.version} - - - com.typesafe.akka - akka-slf4j_${scala.version} - ${akka.version} - org.eclipse.californium californium-core