From 5c35a858bb7c63e5eb32e73d1d7991ee9d3b465c Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Thu, 23 Nov 2023 15:39:02 +0200 Subject: [PATCH] [WIP] Initial implementation for "first and last" strategy in transport service --- .../service/DefaultTransportService.java | 239 +++++++++++++++++- 1 file changed, 230 insertions(+), 9 deletions(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 37b77a5268..0ece919d7b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -52,7 +52,6 @@ import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -96,9 +95,9 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; import org.thingsboard.server.queue.scheduler.SchedulerComponent; @@ -113,6 +112,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -125,6 +125,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -139,7 +141,7 @@ public class DefaultTransportService implements TransportService { public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!"; public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN); public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED); - public static final TransportProtos.SessionCloseNotificationProto SESSION_CLOSE_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder() + public static final TransportProtos.SessionCloseNotificationProto SESSION_EXPIRED_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder() .setMessage(SESSION_EXPIRED_MESSAGE).build(); public static final TransportProtos.SubscribeToAttributeUpdatesMsg SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG = TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder() .setSessionType(TransportProtos.SessionType.ASYNC).build(); @@ -202,7 +204,7 @@ public class DefaultTransportService implements TransportService { private ExecutorService mainConsumerExecutor; public final ConcurrentMap sessions = new ConcurrentHashMap<>(); - private final ConcurrentMap sessionsActivity = new ConcurrentHashMap<>(); + private final ActivityStateManager activityStateManager; private final Map toServerRpcPendingMap = new ConcurrentHashMap<>(); private volatile boolean stopped = false; @@ -234,6 +236,7 @@ public class DefaultTransportService implements TransportService { this.eventPublisher = eventPublisher; this.notificationRuleProcessor = notificationRuleProcessor; this.entityLimitsCache = entityLimitsCache; + activityStateManager = new ActivityStateManager(); } @PostConstruct @@ -242,7 +245,7 @@ public class DefaultTransportService implements TransportService { this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer"); this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer"); this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); - this.scheduler.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); + activityStateManager.init(); this.scheduler.scheduleAtFixedRate(this::invalidateRateLimits, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate(); transportApiRequestTemplate.setMessagesStats(transportApiStats); @@ -785,11 +788,10 @@ public class DefaultTransportService implements TransportService { } private void reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { - UUID sessionId = toSessionId(sessionInfo); - SessionActivityData sessionMetaData = sessionsActivity.computeIfAbsent(sessionId, id -> new SessionActivityData(sessionInfo)); - sessionMetaData.updateLastActivityTime(); + activityStateManager.recordActivity(sessionInfo); } + /* private void checkInactivityAndReportActivity() { long expTime = System.currentTimeMillis() - sessionInactivityTimeout; Set sessionsToRemove = new HashSet<>(); @@ -819,7 +821,7 @@ public class DefaultTransportService implements TransportService { sessions.remove(uuid); sessionsToRemove.add(uuid); process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); - sessionMD.getListener().onRemoteSessionCloseCommand(uuid, SESSION_CLOSE_NOTIFICATION_PROTO); + sessionMD.getListener().onRemoteSessionCloseCommand(uuid, SESSION_EXPIRED_NOTIFICATION_PROTO); } } else { if (lastActivityTime > sessionAD.getLastReportedActivityTime()) { @@ -844,6 +846,225 @@ public class DefaultTransportService implements TransportService { // Removes all closed or short-lived sessions. sessionsToRemove.forEach(sessionsActivity::remove); } + */ + + + // TODO: how can I get access to sessions if activity management logic is implemented as a separate service (class) + + // TODO: why sessions and session activities are managed in a separate maps? maybe it is better to manage in a single map + // (eg. we can check if we had sent last activity event when deregistering session) + + // TODO: currently if activity event is received between precise session expiration time and reportLastEventAndStartNewPeriod() call + // we will "resurrect" this session meaning that + + // TODO: I optimistically set alreadyBeenReported status to true to avoid reporting several activity events if they arrive in rapid succession + // this can cause lost activity updates if first event that got reported failed to enqueue in Kafka and next reporting period is already started + // (maybe having a queue of "first" events will help with this, queue will be cleared once event was successfully queued or later time was reported by event in next period) + // setting alreadyBeenReported status in callbacks ensures that events are reported but can cause several "first" events to be reported + + // TODO: currently activity states are reported on per session basis, but one physical device can have several sessions + // maybe it is a good idea to manage activity state on per device basis + + private final class ActivityStateManager { + + private final ConcurrentMap sessionActivityStates = new ConcurrentHashMap<>(); + + @lombok.Value + // TODO: I chose to have immutable objects for concurrency considerations, + // but it can possibly create a performance issue with creating large amounts of new objects + // maybe mutable objects with volatile fields is better + private class ActivityState { + + TransportProtos.SessionInfoProto sessionInfo; + long lastActivityTime; + boolean alreadyBeenReported; + long lastReportedTime; + + ActivityState withSessionInfo(TransportProtos.SessionInfoProto sessionInfo) { + return new ActivityState(sessionInfo, getLastActivityTime(), isAlreadyBeenReported(), getLastReportedTime()); + } + + ActivityState withLastActivityTime(long lastActivityTime) { + return new ActivityState(getSessionInfo(), lastActivityTime, isAlreadyBeenReported(), getLastReportedTime()); + } + + ActivityState withReportedStatus(boolean hasAlreadyBeenReported) { + return new ActivityState(getSessionInfo(), getLastActivityTime(), hasAlreadyBeenReported, getLastReportedTime()); + } + + ActivityState withLastReportedTime(long lastReportedTime) { + return new ActivityState(getSessionInfo(), getLastActivityTime(), isAlreadyBeenReported(), lastReportedTime); + } + + } + + // TODO: maybe CAS-based synchronization policy will perform better? + // locks can put threads to sleep which introduces scheduling overhead (it is up to JVM to device if a thread will go to sleep or will be spin-waiting) + // activity management tasks are short lived so scheduling overhead may be significant, so CAS can be better since it does not put threads to sleep (always spin-waiting) + // Compare ReadWriteLock to CAS + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final AtomicInteger currentPeriodId = new AtomicInteger(); + + private void init() { + scheduler.scheduleAtFixedRate(this::reportLastEventAndStartNewPeriod, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); + } + + private void recordActivity(TransportProtos.SessionInfoProto sessionInfo) { + long newLastActivityTime = System.currentTimeMillis(); + var sessionId = toSessionId(sessionInfo); + lock.readLock().lock(); + try { + sessionActivityStates.compute(sessionId, (id, currentActivityState) -> { + log.info("------------------------------------------------------------------------------------------------------------------------"); + log.info("Record activity: entered compute! Session id: [{}]", sessionId); + var activityState = Objects.requireNonNullElseGet(currentActivityState, () -> new ActivityState(sessionInfo, newLastActivityTime, false, 0L)); + if (activityState.isAlreadyBeenReported()) { // update the last activity time + log.info("------------------------------------------------------------------------------------------------------------------------"); + return activityState.withLastActivityTime(newLastActivityTime); + } + int capturedPeriodId = currentPeriodId.get(); + reportActivityStateToCore(activityState.getSessionInfo(), newLastActivityTime, new TransportServiceCallback<>() { + @Override + public void onSuccess(Void msgAcknowledged) { + // Success, the optimistic assumption was correct: update last reported time if it is newer + log.info("Record activity: successful callback received! Session id: [{}]", sessionId); + lock.readLock().lock(); + try { + updateLastReportedTime(sessionId, newLastActivityTime); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void onError(Throwable e) { + // Error: revert alreadyBeenReported to false + log.info("Record activity: failed callback received! Session id: [{}]", sessionId); + log.debug("[{}] Failed to report last activity time!", sessionId, e); + lock.readLock().lock(); + try { + sessionActivityStates.computeIfPresent(sessionId, (__, activityState) -> { + boolean updatedReportedStatus = activityState.isAlreadyBeenReported(); + if (capturedPeriodId == currentPeriodId.get() && activityState.getLastReportedTime() < newLastActivityTime) { + updatedReportedStatus = false; + } + return activityState.withReportedStatus(updatedReportedStatus); + }); + } finally { + lock.readLock().unlock(); + } + } + }); + log.info("------------------------------------------------------------------------------------------------------------------------"); + // Optimistically set hasAlreadyBeenReported to true + return new ActivityState(sessionInfo, newLastActivityTime, true, activityState.getLastReportedTime()); + }); + } finally { + lock.readLock().unlock(); + } + } + + private void reportLastEventAndStartNewPeriod() { + lock.writeLock().lock(); + // log.info("------------------------------------------------------------------------------------------------------------------------"); + // log.info("Period change start! Current period id: [{}], activity states: {}", currentPeriodId.get(), sessionActivityStates.keySet()); + try { + Set activityStatesToRemove = new HashSet<>(); + long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; + for (Map.Entry entry : sessionActivityStates.entrySet()) { + var sessionId = entry.getKey(); + var activityState = entry.getValue(); + + long lastActivityTime = activityState.getLastActivityTime(); + + // TODO: why this update is needed? if gateway session is present but this session is already gone we will not check if gateway's last activity time is greater than this one + SessionMetaData sessionMetaData = sessions.get(sessionId); + if (sessionMetaData != null) { + activityState = activityState.withSessionInfo(sessionMetaData.getSessionInfo()); + entry.setValue(activityState); + } else { + // log.info("Removing activity state! Session id: [{}]", sessionId); + activityStatesToRemove.add(sessionId); + } + // TODO: ask about this part end + + TransportProtos.SessionInfoProto sessionInfo = activityState.getSessionInfo(); + + if (sessionInfo.getGwSessionIdMSB() != 0 && sessionInfo.getGwSessionIdLSB() != 0) { + var gwSessionId = new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()); + SessionMetaData gwSessionMetaData = sessions.get(gwSessionId); + if (gwSessionMetaData != null && gwSessionMetaData.isOverwriteActivityTime()) { + ActivityState gwActivityState = sessionActivityStates.get(gwSessionId); + lastActivityTime = Math.max(gwActivityState.getLastActivityTime(), lastActivityTime); + } + } + + if (sessionMetaData != null && lastActivityTime < expirationTime) { + // log.info("Session has expired! Session id: [{}]", sessionId); + if (log.isDebugEnabled()) { + log.debug("[{}] Session has expired due to last activity time: {}!", sessionId, lastActivityTime); + } + sessions.remove(sessionId); + activityStatesToRemove.add(sessionId); + process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); + sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); + } else if (activityState.getLastReportedTime() < lastActivityTime) { + long finalLastActivityTime = lastActivityTime; + reportActivityStateToCore(sessionInfo, sessionMetaData, lastActivityTime, new TransportServiceCallback<>() { + @Override + public void onSuccess(Void msgAcknowledged) { + lock.readLock().lock(); + try { + updateLastReportedTime(sessionId, finalLastActivityTime); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void onError(Throwable e) { + log.debug("[{}] Failed to report last activity time!", sessionId, e); + } + }); + } + entry.setValue(activityState.withReportedStatus(false)); + } + activityStatesToRemove.forEach(sessionActivityStates::remove); + } finally { + currentPeriodId.incrementAndGet(); + // log.info("Period change end! Current period id: [{}], activity states {}", currentPeriodId.get(), sessionActivityStates.keySet()); + // log.info("------------------------------------------------------------------------------------------------------------------------"); + lock.writeLock().unlock(); + } + } + + private void reportActivityStateToCore(TransportProtos.SessionInfoProto sessionInfo, long lastActivityTime, TransportServiceCallback msgAcknowledgedCallback) { + SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo)); + reportActivityStateToCore(sessionInfo, sessionMetaData, lastActivityTime, msgAcknowledgedCallback); + } + + private void reportActivityStateToCore( + TransportProtos.SessionInfoProto sessionInfo, SessionMetaData sessionMetaData, long lastActivityTime, TransportServiceCallback msgAcknowledgedCallback + ) { + log.info("Reporting activity state to core! Session id: [{}], last activity time: [{}], current period id: [{}]", + toSessionId(sessionInfo), lastActivityTime, currentPeriodId.get()); + TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToAttributes()) + .setRpcSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToRPC()) + .setLastActivityTime(lastActivityTime) + .build(); + process(sessionInfo, subscriptionInfo, msgAcknowledgedCallback); + } + + private void updateLastReportedTime(UUID sessionId, long lastActivityTime) { + sessionActivityStates.computeIfPresent(sessionId, (__, currentActivityState) -> { + long updatedLastReportedTime = Math.max(currentActivityState.getLastReportedTime(), lastActivityTime); + return currentActivityState.withLastReportedTime(updatedLastReportedTime); + }); + } + + } + @Override public void lifecycleEvent(TenantId tenantId, DeviceId deviceId, ComponentLifecycleEvent eventType, boolean success, Throwable error) {