diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index f4373db9be..15e9e9d175 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -25,6 +25,9 @@ import akka.actor.Terminated; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Function; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -48,18 +51,17 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +@Slf4j public class AppActor extends RuleChainManagerActor { - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); - - public static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); + private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); private final TenantService tenantService; - private final Map tenantActors; + private final BiMap tenantActors; private AppActor(ActorSystemContext systemContext) { super(systemContext, new SystemRuleChainManager(systemContext)); this.tenantService = systemContext.getTenantService(); - this.tenantActors = new HashMap<>(); + this.tenantActors = HashBiMap.create(); } @Override @@ -69,22 +71,20 @@ public class AppActor extends RuleChainManagerActor { @Override public void preStart() { - logger.info("Starting main system actor."); + log.info("Starting main system actor."); try { initRuleChains(); - if (systemContext.isTenantComponentsInitEnabled()) { PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT); for (Tenant tenant : tenantIterator) { - logger.debug("[{}] Creating tenant actor", tenant.getId()); + log.debug("[{}] Creating tenant actor", tenant.getId()); getOrCreateTenantActor(tenant.getId()); - logger.debug("Tenant actor created."); + log.debug("Tenant actor created."); } } - - logger.info("Main system actor started."); + log.info("Main system actor started."); } catch (Exception e) { - logger.error(e, "Unknown failure"); + log.warn("Unknown failure", e); } } @@ -130,7 +130,7 @@ public class AppActor extends RuleChainManagerActor { private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { if (SYSTEM_TENANT.equals(msg.getTenantId())) { - //TODO: ashvayka handle this. + log.warn("[{}] Invalid service to rule engine msg called. System messages are not supported yet", SYSTEM_TENANT); } else { getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); } @@ -152,7 +152,7 @@ public class AppActor extends RuleChainManagerActor { if (target != null) { target.tell(msg, ActorRef.noSender()); } else { - logger.debug("Invalid component lifecycle msg: {}", msg); + log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg); } } @@ -161,14 +161,26 @@ public class AppActor extends RuleChainManagerActor { } private ActorRef getOrCreateTenantActor(TenantId tenantId) { - return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId)) - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString())); + return tenantActors.computeIfAbsent(tenantId, k -> { + log.debug("[{}] Creating tenant actor.", tenantId); + ActorRef tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId)) + .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()); + context().watch(tenantActor); + log.debug("[{}] Created tenant actor: {}.", tenantId, tenantActor); + return tenantActor; + }); } - private void processTermination(Terminated message) { + @Override + protected void processTermination(Terminated message) { ActorRef terminated = message.actor(); if (terminated instanceof LocalActorRef) { - logger.debug("Removed actor: {}", terminated); + boolean removed = tenantActors.inverse().remove(terminated) != null; + if (removed) { + log.debug("[{}] Removed actor:", terminated); + } else { + log.warn("[{}] Removed actor was not found in the tenant map!"); + } } else { throw new IllegalStateException("Remote actors are not supported!"); } @@ -182,20 +194,17 @@ public class AppActor extends RuleChainManagerActor { } @Override - public AppActor create() throws Exception { + public AppActor create() { return new AppActor(context); } } - private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function() { - @Override - public Directive apply(Throwable t) { - logger.error(t, "Unknown failure"); - if (t instanceof RuntimeException) { - return SupervisorStrategy.restart(); - } else { - return SupervisorStrategy.stop(); - } + private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), t -> { + log.warn("Unknown failure", t); + if (t instanceof RuntimeException) { + return SupervisorStrategy.restart(); + } else { + return SupervisorStrategy.stop(); } }); } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 7b412b1328..f450027fb6 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -15,42 +15,38 @@ */ package org.thingsboard.server.actors.device; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ContextAwareActor; -import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; +@Slf4j public class DeviceActor extends ContextAwareActor { - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); - private final DeviceActorMessageProcessor processor; - private DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { + DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { super(systemContext); - this.processor = new DeviceActorMessageProcessor(systemContext, logger, tenantId, deviceId); + this.processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId); } @Override public void preStart() { - logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); + log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); try { processor.initSessionTimeout(context()); - logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); + log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); } catch (Exception e) { - logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId); + log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e); } } @@ -90,22 +86,4 @@ public class DeviceActor extends ContextAwareActor { return true; } - public static class ActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; - - private final TenantId tenantId; - private final DeviceId deviceId; - - public ActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) { - super(context); - this.tenantId = tenantId; - this.deviceId = deviceId; - } - - @Override - public DeviceActor create() throws Exception { - return new DeviceActor(context, tenantId, deviceId); - } - } - } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java new file mode 100644 index 0000000000..18aa9268bd --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorCreator.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.device; + +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.service.ContextBasedCreator; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; + +public class DeviceActorCreator extends ContextBasedCreator { + private static final long serialVersionUID = 1L; + + private final TenantId tenantId; + private final DeviceId deviceId; + + public DeviceActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) { + super(context); + this.tenantId = tenantId; + this.deviceId = deviceId; + } + + @Override + public DeviceActor create() { + return new DeviceActor(context, tenantId, deviceId); + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index a16bd78a25..47417d1fd7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; @@ -88,6 +89,7 @@ import java.util.stream.Collectors; /** * @author Andrew Shvayka */ +@Slf4j class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { final TenantId tenantId; @@ -106,8 +108,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private String deviceType; private TbMsgMetaData defaultMetaData; - DeviceActorMessageProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, DeviceId deviceId) { - super(systemContext, logger); + DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) { + super(systemContext); this.tenantId = tenantId; this.deviceId = deviceId; this.sessions = new LinkedHashMap<>(); @@ -136,30 +138,30 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { long timeout = request.getExpirationTime() - System.currentTimeMillis(); if (timeout <= 0) { - logger.debug("[{}][{}] Ignoring message due to exp time reached", deviceId, request.getId(), request.getExpirationTime()); + log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime()); return; } boolean sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); - rpcSubscriptions.entrySet().forEach(sub -> { - sendToTransport(rpcRequest, sub.getKey(), sub.getValue().getNodeId()); - if (TransportProtos.SessionType.SYNC == sub.getValue().getType()) { - syncSessionSet.add(sub.getKey()); + rpcSubscriptions.forEach((key, value) -> { + sendToTransport(rpcRequest, key, value.getNodeId()); + if (TransportProtos.SessionType.SYNC == value.getType()) { + syncSessionSet.add(key); } }); syncSessionSet.forEach(rpcSubscriptions::remove); if (request.isOneway() && sent) { - logger.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); + log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId()); systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null)); } else { registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout); } if (sent) { - logger.debug("[{}] RPC request {} is sent!", deviceId, request.getId()); + log.debug("[{}] RPC request {} is sent!", deviceId, request.getId()); } else { - logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId()); + log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId()); } } @@ -172,7 +174,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void processServerSideRpcTimeout(ActorContext context, DeviceActorServerSideRpcTimeoutMsg msg) { ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId()); if (requestMd != null) { - logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); + log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); } @@ -181,13 +183,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private void sendPendingRequests(ActorContext context, UUID sessionId, SessionInfoProto sessionInfo) { TransportProtos.SessionType sessionType = getSessionType(sessionId); if (!toDeviceRpcPendingMap.isEmpty()) { - logger.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); + log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId); if (sessionType == TransportProtos.SessionType.SYNC) { - logger.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); + log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } } else { - logger.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); + log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId); } Set sentOneWayIds = new HashSet<>(); if (sessionType == TransportProtos.SessionType.ASYNC) { @@ -335,7 +337,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { void processClientSideRpcTimeout(ActorContext context, DeviceActorClientSideRpcTimeoutMsg msg) { ToServerRpcRequestMetadata data = toServerRpcPendingMap.remove(msg.getId()); if (data != null) { - logger.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId()); + log.debug("[{}] Client side RPC request [{}] timeout detected!", deviceId, msg.getId()); sendToTransport(TransportProtos.ToServerRpcResponseMsg.newBuilder() .setRequestId(msg.getId()).setError("timeout").build() , data.getSessionId(), data.getNodeId()); @@ -380,7 +382,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { hasNotificationData = true; } } else { - logger.debug("[{}] No public server side attributes changed!", deviceId); + log.debug("[{}] No public server side attributes changed!", deviceId); } } } @@ -391,27 +393,27 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { }); } } else { - logger.debug("[{}] No registered attributes subscriptions to process!", deviceId); + log.debug("[{}] No registered attributes subscriptions to process!", deviceId); } } private void processRpcResponses(ActorContext context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) { UUID sessionId = getSessionId(sessionInfo); - logger.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId); + log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId); ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId()); boolean success = requestMd != null; if (success) { systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), responseMsg.getPayload(), null)); } else { - logger.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); + log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId()); } } private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { - logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); + log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); attributeSubscriptions.remove(sessionId); } else { SessionInfoMetaData sessionMD = sessions.get(sessionId); @@ -419,7 +421,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId())); } sessionMD.setSubscribedToAttributes(true); - logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); + log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo()); dumpSessions(); } @@ -432,7 +434,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { - logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); + log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } else { SessionInfoMetaData sessionMD = sessions.get(sessionId); @@ -440,7 +442,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId())); } sessionMD.setSubscribedToRPC(true); - logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); + log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); sendPendingRequests(context, sessionId, sessionInfo); dumpSessions(); @@ -451,10 +453,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { UUID sessionId = getSessionId(sessionInfo); if (msg.getEvent() == SessionEvent.OPEN) { if (sessions.containsKey(sessionId)) { - logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId); + log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId); return; } - logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); + log.debug("[{}] Processing new session [{}]", deviceId, sessionId); if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); if (sessionIdToRemove != null) { @@ -467,7 +469,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } dumpSessions(); } else if (msg.getEvent() == SessionEvent.CLOSED) { - logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); + log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); sessions.remove(sessionId); attributeSubscriptions.remove(sessionId); rpcSubscriptions.remove(sessionId); @@ -623,10 +625,10 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { } private void restoreSessions() { - logger.debug("[{}] Restoring sessions from cache", deviceId); + log.debug("[{}] Restoring sessions from cache", deviceId); TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId); if (sessionsDump.getSerializedSize() == 0) { - logger.debug("[{}] No session information found", deviceId); + log.debug("[{}] No session information found", deviceId); return; } for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) { @@ -644,13 +646,13 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { rpcSubscriptions.put(sessionId, sessionInfo); sessionMD.setSubscribedToRPC(true); } - logger.debug("[{}] Restored session: {}", deviceId, sessionMD); + log.debug("[{}] Restored session: {}", deviceId, sessionMD); } - logger.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); + log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); } private void dumpSessions() { - logger.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); + log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size()); List sessionsList = new ArrayList<>(sessions.size()); sessions.forEach((uuid, sessionMD) -> { if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) { @@ -668,7 +670,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder() .setSessionInfo(sessionInfoProto) .setSubscriptionInfo(subscriptionInfoProto).build()); - logger.debug("[{}] Dumping session: {}", deviceId, sessionMD); + log.debug("[{}] Dumping session: {}", deviceId, sessionMD); }); systemContext.getDeviceSessionCacheService() .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java index 31320ce5d2..922d2ba988 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java @@ -19,6 +19,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -35,17 +36,16 @@ import java.util.*; /** * @author Andrew Shvayka */ +@Slf4j public class RpcManagerActor extends ContextAwareActor { - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - private final Map sessionActors; private final Map> pendingMsgs; private final ServerAddress instance; - RpcManagerActor(ActorSystemContext systemContext) { + private RpcManagerActor(ActorSystemContext systemContext) { super(systemContext); this.sessionActors = new HashMap<>(); this.pendingMsgs = new HashMap<>(); @@ -116,7 +116,7 @@ public class RpcManagerActor extends ContextAwareActor { queue.add(msg); } } else { - logger.warning("Cluster msg doesn't have server address [{}]", msg); + log.warn("Cluster msg doesn't have server address [{}]", msg); } } @@ -207,7 +207,7 @@ public class RpcManagerActor extends ContextAwareActor { } @Override - public RpcManagerActor create() throws Exception { + public RpcManagerActor create() { return new RpcManagerActor(context); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java index dbad7c0b66..ed765e04f8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActor.java @@ -33,7 +33,7 @@ public class RuleChainActor extends ComponentActor { private static final long DEFAULT_CLUSTER_PARTITION = 0L; @@ -69,8 +71,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(); @@ -216,7 +218,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor { private final RuleChainId ruleChainId; @@ -32,7 +34,7 @@ public class RuleNodeActor extends ComponentActor> extends ContextAwareActor { - protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); - private long lastPersistedErrorTs = 0L; protected final TenantId tenantId; protected final T id; @@ -60,7 +60,7 @@ public abstract class ComponentActor getErrorPersistFrequency()) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java index 3624127194..c9f1dcde3e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/ContextAwareActor.java @@ -15,14 +15,16 @@ */ package org.thingsboard.server.actors.service; +import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.TbActorMsg; +@Slf4j public abstract class ContextAwareActor extends UntypedActor { - protected final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); public static final int ENTITY_PACK_LIMIT = 1024; @@ -35,21 +37,26 @@ public abstract class ContextAwareActor extends UntypedActor { @Override public void onReceive(Object msg) throws Exception { - if (logger.isDebugEnabled()) { - logger.debug("Processing msg: {}", msg); + if (log.isDebugEnabled()) { + log.debug("Processing msg: {}", msg); } if (msg instanceof TbActorMsg) { try { if (!process((TbActorMsg) msg)) { - logger.warning("Unknown message: {}!", msg); + log.warn("Unknown message: {}!", msg); } } catch (Exception e) { throw e; } + } else if (msg instanceof Terminated) { + processTermination((Terminated) msg); } else { - logger.warning("Unknown message: {}!", msg); + log.warn("Unknown message: {}!", msg); } } + protected void processTermination(Terminated msg) { + } + protected abstract boolean process(TbActorMsg msg); } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java index c8097821d9..b707baf14e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java @@ -22,22 +22,22 @@ import akka.event.LoggingAdapter; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import scala.concurrent.ExecutionContextExecutor; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; +@Slf4j public abstract class AbstractContextAwareMsgProcessor { protected final ActorSystemContext systemContext; - protected final LoggingAdapter logger; protected final ObjectMapper mapper = new ObjectMapper(); - protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger) { + protected AbstractContextAwareMsgProcessor(ActorSystemContext systemContext) { super(); this.systemContext = systemContext; - this.logger = logger; } private Scheduler getScheduler() { @@ -53,7 +53,7 @@ public abstract class AbstractContextAwareMsgProcessor { } private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) { - logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); + log.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); } @@ -62,7 +62,7 @@ public abstract class AbstractContextAwareMsgProcessor { } private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { - logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); + log.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java index c9dc307451..9e5d063223 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/ComponentMsgProcessor.java @@ -19,6 +19,7 @@ import akka.actor.ActorContext; import akka.event.LoggingAdapter; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.stats.StatsPersistTick; import org.thingsboard.server.common.data.id.EntityId; @@ -30,14 +31,15 @@ import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; import javax.annotation.Nullable; import java.util.function.Consumer; +@Slf4j public abstract class ComponentMsgProcessor extends AbstractContextAwareMsgProcessor { protected final TenantId tenantId; protected final T entityId; protected ComponentLifecycleState state; - protected ComponentMsgProcessor(ActorSystemContext systemContext, LoggingAdapter logger, TenantId tenantId, T id) { - super(systemContext, logger); + protected ComponentMsgProcessor(ActorSystemContext systemContext, TenantId tenantId, T id) { + super(systemContext); this.tenantId = tenantId; this.entityId = id; } @@ -79,7 +81,7 @@ public abstract class ComponentMsgProcessor extends Abstract protected void checkActive() { if (state != ComponentLifecycleState.ACTIVE) { - logger.warning("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId); + log.warn("Rule chain is not active. Current state [{}] for processor [{}] tenant [{}]", state, tenantId, entityId); throw new IllegalStateException("Rule chain is not active! " + entityId + " - " + tenantId); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java index 1b9e6a8074..dd03d69a62 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/EntityActorsManager.java @@ -20,6 +20,8 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ContextAwareActor; @@ -39,11 +41,11 @@ import java.util.Map; public abstract class EntityActorsManager> { protected final ActorSystemContext systemContext; - protected final Map actors; + protected final BiMap actors; public EntityActorsManager(ActorSystemContext systemContext) { this.systemContext = systemContext; - this.actors = new HashMap<>(); + this.actors = HashBiMap.create(); } protected abstract TenantId getTenantId(); @@ -65,7 +67,8 @@ public abstract class EntityActorsManager diff --git a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java index 8623370896..79aa6da964 100644 --- a/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/stats/StatsActor.java @@ -15,10 +15,9 @@ */ package org.thingsboard.server.actors.stats; -import akka.event.Logging; -import akka.event.LoggingAdapter; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ContextAwareActor; import org.thingsboard.server.actors.service.ContextBasedCreator; @@ -27,9 +26,9 @@ import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; +@Slf4j public class StatsActor extends ContextAwareActor { - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); private final ObjectMapper mapper = new ObjectMapper(); public StatsActor(ActorSystemContext context) { @@ -43,13 +42,13 @@ public class StatsActor extends ContextAwareActor { } @Override - public void onReceive(Object msg) throws Exception { - logger.debug("Received message: {}", msg); + public void onReceive(Object msg) { + log.debug("Received message: {}", msg); if (msg instanceof StatsPersistMsg) { try { onStatsPersistMsg((StatsPersistMsg) msg); } catch (Exception e) { - logger.warning("Failed to persist statistics: {}", msg, e); + log.warn("Failed to persist statistics: {}", msg, e); } } } @@ -75,7 +74,7 @@ public class StatsActor extends ContextAwareActor { } @Override - public StatsActor create() throws Exception { + public StatsActor create() { return new StatsActor(context); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 721d828fd6..b76e2984ae 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -17,15 +17,19 @@ package org.thingsboard.server.actors.tenant; import akka.actor.ActorInitializationException; import akka.actor.ActorRef; +import akka.actor.LocalActorRef; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.actor.Terminated; import akka.japi.Function; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.device.DeviceActor; +import org.thingsboard.server.actors.device.DeviceActorCreator; import org.thingsboard.server.actors.device.DeviceActorToRuleEngineMsg; import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor; -import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.rulechain.TenantRuleChainManager; @@ -44,18 +48,18 @@ import scala.concurrent.duration.Duration; import java.util.HashMap; import java.util.Map; +@Slf4j public class TenantActor extends RuleChainManagerActor { private final TenantId tenantId; - private final Map deviceActors; + private final BiMap deviceActors; private TenantActor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext, new TenantRuleChainManager(systemContext, tenantId)); this.tenantId = tenantId; - this.deviceActors = new HashMap<>(); + this.deviceActors = HashBiMap.create(); } - @Override public SupervisorStrategy supervisorStrategy() { return strategy; @@ -63,12 +67,12 @@ public class TenantActor extends RuleChainManagerActor { @Override public void preStart() { - logger.info("[{}] Starting tenant actor.", tenantId); + log.info("[{}] Starting tenant actor.", tenantId); try { initRuleChains(); - logger.info("[{}] Tenant actor started.", tenantId); + log.info("[{}] Tenant actor started.", tenantId); } catch (Exception e) { - logger.error(e, "[{}] Unknown failure", tenantId); + log.warn("[{}] Unknown failure", tenantId, e); } } @@ -105,22 +109,20 @@ public class TenantActor extends RuleChainManagerActor { return true; } - @Override - protected void broadcast(Object msg) { - super.broadcast(msg); -// deviceActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); - } - private void onServiceToRuleEngineMsg(ServiceToRuleEngineMsg msg) { - if (ruleChainManager.getRootChainActor()!=null) - ruleChainManager.getRootChainActor().tell(msg, self()); - else logger.info("[{}] No Root Chain", msg); + if (ruleChainManager.getRootChainActor() != null) { + ruleChainManager.getRootChainActor().tell(msg, self()); + } else { + log.info("[{}] No Root Chain: {}", tenantId, msg); + } } private void onDeviceActorToRuleEngineMsg(DeviceActorToRuleEngineMsg msg) { - if (ruleChainManager.getRootChainActor()!=null) - ruleChainManager.getRootChainActor().tell(msg, self()); - else logger.info("[{}] No Root Chain", msg); + if (ruleChainManager.getRootChainActor() != null) { + ruleChainManager.getRootChainActor().tell(msg, self()); + } else { + log.info("[{}] No Root Chain: {}", tenantId, msg); + } } private void onRuleChainMsg(RuleChainAwareMsg msg) { @@ -141,13 +143,35 @@ public class TenantActor extends RuleChainManagerActor { } target.tell(msg, ActorRef.noSender()); } else { - logger.debug("Invalid component lifecycle msg: {}", msg); + log.debug("[{}] Invalid component lifecycle msg: {}", tenantId, msg); } } private ActorRef getOrCreateDeviceActor(DeviceId deviceId) { - return deviceActors.computeIfAbsent(deviceId, k -> context().actorOf(Props.create(new DeviceActor.ActorCreator(systemContext, tenantId, deviceId)) - .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), deviceId.toString())); + return deviceActors.computeIfAbsent(deviceId, k -> { + log.debug("[{}][{}] Creating device actor.", tenantId, deviceId); + ActorRef deviceActor = context().actorOf(Props.create(new DeviceActorCreator(systemContext, tenantId, deviceId)) + .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME) + , deviceId.toString()); + context().watch(deviceActor); + log.debug("[{}][{}] Created device actor: {}.", tenantId, deviceId, deviceActor); + return deviceActor; + }); + } + + @Override + protected void processTermination(Terminated message) { + ActorRef terminated = message.actor(); + if (terminated instanceof LocalActorRef) { + boolean removed = deviceActors.inverse().remove(terminated) != null; + if (removed) { + log.debug("[{}] Removed actor:", terminated); + } else { + log.warn("[{}] Removed actor was not found in the device map!"); + } + } else { + throw new IllegalStateException("Remote actors are not supported!"); + } } public static class ActorCreator extends ContextBasedCreator { @@ -161,7 +185,7 @@ public class TenantActor extends RuleChainManagerActor { } @Override - public TenantActor create() throws Exception { + public TenantActor create() { return new TenantActor(context, tenantId); } } @@ -169,8 +193,8 @@ public class TenantActor extends RuleChainManagerActor { private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { - logger.error(t, "Unknown failure"); - if(t instanceof ActorInitializationException){ + log.warn("[{}] Unknown failure", tenantId, t); + if (t instanceof ActorInitializationException) { return SupervisorStrategy.stop(); } else { return SupervisorStrategy.resume(); diff --git a/application/src/main/java/org/thingsboard/server/controller/DashboardController.java b/application/src/main/java/org/thingsboard/server/controller/DashboardController.java index 450b9761e9..0bb6a6683d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DashboardController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DashboardController.java @@ -52,7 +52,6 @@ public class DashboardController extends BaseController { public static final String DASHBOARD_ID = "dashboardId"; @Value("${dashboard.max_datapoints_limit}") - @Getter private long maxDatapointsLimit; diff --git a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java index 544b6fdea4..68d9ada212 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java +++ b/application/src/main/java/org/thingsboard/server/service/security/model/token/RawAccessJwtToken.java @@ -22,6 +22,7 @@ import io.jsonwebtoken.Jwts; import io.jsonwebtoken.MalformedJwtException; import io.jsonwebtoken.SignatureException; import io.jsonwebtoken.UnsupportedJwtException; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.authentication.BadCredentialsException; @@ -29,12 +30,11 @@ import org.thingsboard.server.service.security.exception.JwtExpiredTokenExceptio import java.io.Serializable; +@Slf4j public class RawAccessJwtToken implements JwtToken, Serializable { private static final long serialVersionUID = -797397445703066079L; - private static Logger logger = LoggerFactory.getLogger(RawAccessJwtToken.class); - private String token; public RawAccessJwtToken(String token) { @@ -52,10 +52,10 @@ public class RawAccessJwtToken implements JwtToken, Serializable { try { return Jwts.parser().setSigningKey(signingKey).parseClaimsJws(this.token); } catch (UnsupportedJwtException | MalformedJwtException | IllegalArgumentException | SignatureException ex) { - logger.error("Invalid JWT Token", ex); + log.error("Invalid JWT Token", ex); throw new BadCredentialsException("Invalid JWT token: ", ex); } catch (ExpiredJwtException expiredEx) { - logger.info("JWT Token is expired", expiredEx); + log.info("JWT Token is expired", expiredEx); throw new JwtExpiredTokenException(this, "JWT Token expired", expiredEx); } } diff --git a/application/src/main/resources/actor-system.conf b/application/src/main/resources/actor-system.conf index 763319e52c..28673f6377 100644 --- a/application/src/main/resources/actor-system.conf +++ b/application/src/main/resources/actor-system.conf @@ -19,7 +19,7 @@ akka { # JVM shutdown, System.exit(-1), in case of a fatal error, # such as OutOfMemoryError jvm-exit-on-fatal-error = off - loglevel = "DEBUG" + loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] }