Improved reporting of last activity time from remote transport
This commit is contained in:
		
							parent
							
								
									752b3d90ab
								
							
						
					
					
						commit
						f9abf1c7a2
					
				@ -27,6 +27,7 @@ import org.thingsboard.server.common.transport.TransportService;
 | 
				
			|||||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
 | 
					import org.thingsboard.server.common.transport.TransportServiceCallback;
 | 
				
			||||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
					import org.thingsboard.server.gen.transport.TransportProtos;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.util.Random;
 | 
				
			||||||
import java.util.UUID;
 | 
					import java.util.UUID;
 | 
				
			||||||
import java.util.concurrent.*;
 | 
					import java.util.concurrent.*;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -176,10 +177,23 @@ public abstract class AbstractTransportService implements TransportService {
 | 
				
			|||||||
                sessions.remove(uuid);
 | 
					                sessions.remove(uuid);
 | 
				
			||||||
                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
 | 
					                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
 | 
					                if (sessionMD.getLastActivityTime() > sessionMD.getLastReportedActivityTime()) {
 | 
				
			||||||
                        .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
 | 
					                    final long lastActivityTime = sessionMD.getLastActivityTime();
 | 
				
			||||||
                        .setRpcSubscription(sessionMD.isSubscribedToRPC())
 | 
					                    process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
 | 
				
			||||||
                        .setLastActivityTime(sessionMD.getLastActivityTime()).build(), null);
 | 
					                            .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
 | 
				
			||||||
 | 
					                            .setRpcSubscription(sessionMD.isSubscribedToRPC())
 | 
				
			||||||
 | 
					                            .setLastActivityTime(sessionMD.getLastActivityTime()).build(), new TransportServiceCallback<Void>() {
 | 
				
			||||||
 | 
					                        @Override
 | 
				
			||||||
 | 
					                        public void onSuccess(Void msg) {
 | 
				
			||||||
 | 
					                            sessionMD.setLastReportedActivityTime(lastActivityTime);
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        @Override
 | 
				
			||||||
 | 
					                        public void onError(Throwable e) {
 | 
				
			||||||
 | 
					                            log.warn("[{}] Failed to report last activity time", uuid, e);
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    });
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -288,7 +302,7 @@ public abstract class AbstractTransportService implements TransportService {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
 | 
					        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
 | 
				
			||||||
        this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
 | 
					        this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
 | 
				
			||||||
        this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
 | 
					        this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void destroy() {
 | 
					    public void destroy() {
 | 
				
			||||||
 | 
				
			|||||||
@ -34,6 +34,7 @@ class SessionMetaData {
 | 
				
			|||||||
    private ScheduledFuture scheduledFuture;
 | 
					    private ScheduledFuture scheduledFuture;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private volatile long lastActivityTime;
 | 
					    private volatile long lastActivityTime;
 | 
				
			||||||
 | 
					    private volatile long lastReportedActivityTime;
 | 
				
			||||||
    private volatile boolean subscribedToAttributes;
 | 
					    private volatile boolean subscribedToAttributes;
 | 
				
			||||||
    private volatile boolean subscribedToRPC;
 | 
					    private volatile boolean subscribedToRPC;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user