Merge pull request #5889 from thingsboard/device-session-timeout-improvement
[3.3.3] Reduce number of scheduled messages for session timeout
This commit is contained in:
		
						commit
						5b9618dfeb
					
				@ -18,9 +18,12 @@ package org.thingsboard.server.actors.app;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.actors.TbActor;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorCtx;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorException;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorId;
 | 
			
		||||
import org.thingsboard.server.actors.TbActorRef;
 | 
			
		||||
import org.thingsboard.server.actors.TbEntityActorId;
 | 
			
		||||
import org.thingsboard.server.actors.device.SessionTimeoutCheckMsg;
 | 
			
		||||
import org.thingsboard.server.actors.service.ContextAwareActor;
 | 
			
		||||
import org.thingsboard.server.actors.service.ContextBasedCreator;
 | 
			
		||||
import org.thingsboard.server.actors.service.DefaultActorService;
 | 
			
		||||
@ -64,6 +67,15 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
        this.deletedTenants = new HashSet<>();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void init(TbActorCtx ctx) throws TbActorException {
 | 
			
		||||
        super.init(ctx);
 | 
			
		||||
        if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE)) {
 | 
			
		||||
            systemContext.schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(),
 | 
			
		||||
                    systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected boolean doProcess(TbActorMsg msg) {
 | 
			
		||||
        if (!ruleChainsInitialized) {
 | 
			
		||||
@ -101,6 +113,9 @@ public class AppActor extends ContextAwareActor {
 | 
			
		||||
            case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
 | 
			
		||||
                onToTenantActorMsg((EdgeEventUpdateMsg) msg);
 | 
			
		||||
                break;
 | 
			
		||||
            case SESSION_TIMEOUT_MSG:
 | 
			
		||||
                ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
 | 
			
		||||
                break;
 | 
			
		||||
            default:
 | 
			
		||||
                return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -939,7 +939,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void init(TbActorCtx ctx) {
 | 
			
		||||
        schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(), systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
 | 
			
		||||
        PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
 | 
			
		||||
        PageData<Rpc> pageData;
 | 
			
		||||
        do {
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ import org.thingsboard.server.actors.TbActorRef;
 | 
			
		||||
import org.thingsboard.server.actors.TbEntityActorId;
 | 
			
		||||
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
 | 
			
		||||
import org.thingsboard.server.actors.device.DeviceActorCreator;
 | 
			
		||||
import org.thingsboard.server.actors.device.SessionTimeoutCheckMsg;
 | 
			
		||||
import org.thingsboard.server.actors.ruleChain.RuleChainInputMsg;
 | 
			
		||||
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
 | 
			
		||||
import org.thingsboard.server.actors.ruleChain.RuleChainOutputMsg;
 | 
			
		||||
@ -170,6 +171,9 @@ public class TenantActor extends RuleChainManagerActor {
 | 
			
		||||
            case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
 | 
			
		||||
                onToDeviceActorMsg((DeviceAwareMsg) msg, true);
 | 
			
		||||
                break;
 | 
			
		||||
            case SESSION_TIMEOUT_MSG:
 | 
			
		||||
                ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);
 | 
			
		||||
                break;
 | 
			
		||||
            case RULE_CHAIN_INPUT_MSG:
 | 
			
		||||
            case RULE_CHAIN_OUTPUT_MSG:
 | 
			
		||||
            case RULE_CHAIN_TO_RULE_CHAIN_MSG:
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -35,6 +36,8 @@ public interface TbActorCtx extends TbActorRef {
 | 
			
		||||
 | 
			
		||||
    void broadcastToChildren(TbActorMsg msg);
 | 
			
		||||
 | 
			
		||||
    void broadcastToChildrenByType(TbActorMsg msg, EntityType entityType);
 | 
			
		||||
 | 
			
		||||
    void broadcastToChildren(TbActorMsg msg, Predicate<TbActorId> childFilter);
 | 
			
		||||
 | 
			
		||||
    List<TbActorId> filterChildren(Predicate<TbActorId> childFilter);
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,16 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
 | 
			
		||||
public interface TbActorId {
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Returns entity type of the actor.
 | 
			
		||||
     * May return null if the actor does not belong to any entity.
 | 
			
		||||
     * This method is added for performance optimization.
 | 
			
		||||
     *
 | 
			
		||||
     */
 | 
			
		||||
    EntityType getEntityType();
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.msg.MsgType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbActorStopReason;
 | 
			
		||||
@ -139,7 +140,7 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
                try {
 | 
			
		||||
                    log.debug("[{}] Going to process message: {}", selfId, msg);
 | 
			
		||||
                    actor.process(msg);
 | 
			
		||||
                } catch (TbRuleNodeUpdateException updateException){
 | 
			
		||||
                } catch (TbRuleNodeUpdateException updateException) {
 | 
			
		||||
                    stopReason = TbActorStopReason.INIT_FAILED;
 | 
			
		||||
                    destroy();
 | 
			
		||||
                } catch (Throwable t) {
 | 
			
		||||
@ -177,6 +178,11 @@ public final class TbActorMailbox implements TbActorCtx {
 | 
			
		||||
        system.broadcastToChildren(selfId, msg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void broadcastToChildrenByType(TbActorMsg msg, EntityType entityType) {
 | 
			
		||||
        broadcastToChildren(msg, actorId -> entityType.equals(actorId.getEntityType()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void broadcastToChildren(TbActorMsg msg, Predicate<TbActorId> childFilter) {
 | 
			
		||||
        system.broadcastToChildren(selfId, childFilter, msg);
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
import lombok.Getter;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
@ -46,4 +47,9 @@ public class TbEntityActorId implements TbActorId {
 | 
			
		||||
    public int hashCode() {
 | 
			
		||||
        return Objects.hash(entityId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public EntityType getEntityType() {
 | 
			
		||||
        return entityId.getEntityType();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,8 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.actors;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
 | 
			
		||||
public class TbStringActorId implements TbActorId {
 | 
			
		||||
@ -42,4 +44,9 @@ public class TbStringActorId implements TbActorId {
 | 
			
		||||
    public int hashCode() {
 | 
			
		||||
        return Objects.hash(id);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public EntityType getEntityType() {
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user