[WIP] Initial implementation for "first and last" strategy in transport service

This commit is contained in:
Dmytro Skarzhynets 2023-11-23 15:39:02 +02:00
parent b7311a089f
commit 5c35a858bb

View File

@ -52,7 +52,6 @@ import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -96,9 +95,9 @@ import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
@ -113,6 +112,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@ -125,6 +125,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -139,7 +141,7 @@ public class DefaultTransportService implements TransportService {
public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!";
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN);
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED);
public static final TransportProtos.SessionCloseNotificationProto SESSION_CLOSE_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder()
public static final TransportProtos.SessionCloseNotificationProto SESSION_EXPIRED_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder()
.setMessage(SESSION_EXPIRED_MESSAGE).build();
public static final TransportProtos.SubscribeToAttributeUpdatesMsg SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG = TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder()
.setSessionType(TransportProtos.SessionType.ASYNC).build();
@ -202,7 +204,7 @@ public class DefaultTransportService implements TransportService {
private ExecutorService mainConsumerExecutor;
public final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, SessionActivityData> sessionsActivity = new ConcurrentHashMap<>();
private final ActivityStateManager activityStateManager;
private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>();
private volatile boolean stopped = false;
@ -234,6 +236,7 @@ public class DefaultTransportService implements TransportService {
this.eventPublisher = eventPublisher;
this.notificationRuleProcessor = notificationRuleProcessor;
this.entityLimitsCache = entityLimitsCache;
activityStateManager = new ActivityStateManager();
}
@PostConstruct
@ -242,7 +245,7 @@ public class DefaultTransportService implements TransportService {
this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
this.transportCallbackExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
this.scheduler.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
activityStateManager.init();
this.scheduler.scheduleAtFixedRate(this::invalidateRateLimits, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
transportApiRequestTemplate.setMessagesStats(transportApiStats);
@ -785,11 +788,10 @@ public class DefaultTransportService implements TransportService {
}
private void reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
UUID sessionId = toSessionId(sessionInfo);
SessionActivityData sessionMetaData = sessionsActivity.computeIfAbsent(sessionId, id -> new SessionActivityData(sessionInfo));
sessionMetaData.updateLastActivityTime();
activityStateManager.recordActivity(sessionInfo);
}
/*
private void checkInactivityAndReportActivity() {
long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
Set<UUID> sessionsToRemove = new HashSet<>();
@ -819,7 +821,7 @@ public class DefaultTransportService implements TransportService {
sessions.remove(uuid);
sessionsToRemove.add(uuid);
process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMD.getListener().onRemoteSessionCloseCommand(uuid, SESSION_CLOSE_NOTIFICATION_PROTO);
sessionMD.getListener().onRemoteSessionCloseCommand(uuid, SESSION_EXPIRED_NOTIFICATION_PROTO);
}
} else {
if (lastActivityTime > sessionAD.getLastReportedActivityTime()) {
@ -844,6 +846,225 @@ public class DefaultTransportService implements TransportService {
// Removes all closed or short-lived sessions.
sessionsToRemove.forEach(sessionsActivity::remove);
}
*/
// TODO: how can I get access to sessions if activity management logic is implemented as a separate service (class)
// TODO: why sessions and session activities are managed in a separate maps? maybe it is better to manage in a single map
// (eg. we can check if we had sent last activity event when deregistering session)
// TODO: currently if activity event is received between precise session expiration time and reportLastEventAndStartNewPeriod() call
// we will "resurrect" this session meaning that
// TODO: I optimistically set alreadyBeenReported status to true to avoid reporting several activity events if they arrive in rapid succession
// this can cause lost activity updates if first event that got reported failed to enqueue in Kafka and next reporting period is already started
// (maybe having a queue of "first" events will help with this, queue will be cleared once event was successfully queued or later time was reported by event in next period)
// setting alreadyBeenReported status in callbacks ensures that events are reported but can cause several "first" events to be reported
// TODO: currently activity states are reported on per session basis, but one physical device can have several sessions
// maybe it is a good idea to manage activity state on per device basis
private final class ActivityStateManager {
private final ConcurrentMap<UUID, ActivityState> sessionActivityStates = new ConcurrentHashMap<>();
@lombok.Value
// TODO: I chose to have immutable objects for concurrency considerations,
// but it can possibly create a performance issue with creating large amounts of new objects
// maybe mutable objects with volatile fields is better
private class ActivityState {
TransportProtos.SessionInfoProto sessionInfo;
long lastActivityTime;
boolean alreadyBeenReported;
long lastReportedTime;
ActivityState withSessionInfo(TransportProtos.SessionInfoProto sessionInfo) {
return new ActivityState(sessionInfo, getLastActivityTime(), isAlreadyBeenReported(), getLastReportedTime());
}
ActivityState withLastActivityTime(long lastActivityTime) {
return new ActivityState(getSessionInfo(), lastActivityTime, isAlreadyBeenReported(), getLastReportedTime());
}
ActivityState withReportedStatus(boolean hasAlreadyBeenReported) {
return new ActivityState(getSessionInfo(), getLastActivityTime(), hasAlreadyBeenReported, getLastReportedTime());
}
ActivityState withLastReportedTime(long lastReportedTime) {
return new ActivityState(getSessionInfo(), getLastActivityTime(), isAlreadyBeenReported(), lastReportedTime);
}
}
// TODO: maybe CAS-based synchronization policy will perform better?
// locks can put threads to sleep which introduces scheduling overhead (it is up to JVM to device if a thread will go to sleep or will be spin-waiting)
// activity management tasks are short lived so scheduling overhead may be significant, so CAS can be better since it does not put threads to sleep (always spin-waiting)
// Compare ReadWriteLock to CAS
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AtomicInteger currentPeriodId = new AtomicInteger();
private void init() {
scheduler.scheduleAtFixedRate(this::reportLastEventAndStartNewPeriod, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
}
private void recordActivity(TransportProtos.SessionInfoProto sessionInfo) {
long newLastActivityTime = System.currentTimeMillis();
var sessionId = toSessionId(sessionInfo);
lock.readLock().lock();
try {
sessionActivityStates.compute(sessionId, (id, currentActivityState) -> {
log.info("------------------------------------------------------------------------------------------------------------------------");
log.info("Record activity: entered compute! Session id: [{}]", sessionId);
var activityState = Objects.requireNonNullElseGet(currentActivityState, () -> new ActivityState(sessionInfo, newLastActivityTime, false, 0L));
if (activityState.isAlreadyBeenReported()) { // update the last activity time
log.info("------------------------------------------------------------------------------------------------------------------------");
return activityState.withLastActivityTime(newLastActivityTime);
}
int capturedPeriodId = currentPeriodId.get();
reportActivityStateToCore(activityState.getSessionInfo(), newLastActivityTime, new TransportServiceCallback<>() {
@Override
public void onSuccess(Void msgAcknowledged) {
// Success, the optimistic assumption was correct: update last reported time if it is newer
log.info("Record activity: successful callback received! Session id: [{}]", sessionId);
lock.readLock().lock();
try {
updateLastReportedTime(sessionId, newLastActivityTime);
} finally {
lock.readLock().unlock();
}
}
@Override
public void onError(Throwable e) {
// Error: revert alreadyBeenReported to false
log.info("Record activity: failed callback received! Session id: [{}]", sessionId);
log.debug("[{}] Failed to report last activity time!", sessionId, e);
lock.readLock().lock();
try {
sessionActivityStates.computeIfPresent(sessionId, (__, activityState) -> {
boolean updatedReportedStatus = activityState.isAlreadyBeenReported();
if (capturedPeriodId == currentPeriodId.get() && activityState.getLastReportedTime() < newLastActivityTime) {
updatedReportedStatus = false;
}
return activityState.withReportedStatus(updatedReportedStatus);
});
} finally {
lock.readLock().unlock();
}
}
});
log.info("------------------------------------------------------------------------------------------------------------------------");
// Optimistically set hasAlreadyBeenReported to true
return new ActivityState(sessionInfo, newLastActivityTime, true, activityState.getLastReportedTime());
});
} finally {
lock.readLock().unlock();
}
}
private void reportLastEventAndStartNewPeriod() {
lock.writeLock().lock();
// log.info("------------------------------------------------------------------------------------------------------------------------");
// log.info("Period change start! Current period id: [{}], activity states: {}", currentPeriodId.get(), sessionActivityStates.keySet());
try {
Set<UUID> activityStatesToRemove = new HashSet<>();
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
for (Map.Entry<UUID, ActivityState> entry : sessionActivityStates.entrySet()) {
var sessionId = entry.getKey();
var activityState = entry.getValue();
long lastActivityTime = activityState.getLastActivityTime();
// TODO: why this update is needed? if gateway session is present but this session is already gone we will not check if gateway's last activity time is greater than this one
SessionMetaData sessionMetaData = sessions.get(sessionId);
if (sessionMetaData != null) {
activityState = activityState.withSessionInfo(sessionMetaData.getSessionInfo());
entry.setValue(activityState);
} else {
// log.info("Removing activity state! Session id: [{}]", sessionId);
activityStatesToRemove.add(sessionId);
}
// TODO: ask about this part end
TransportProtos.SessionInfoProto sessionInfo = activityState.getSessionInfo();
if (sessionInfo.getGwSessionIdMSB() != 0 && sessionInfo.getGwSessionIdLSB() != 0) {
var gwSessionId = new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB());
SessionMetaData gwSessionMetaData = sessions.get(gwSessionId);
if (gwSessionMetaData != null && gwSessionMetaData.isOverwriteActivityTime()) {
ActivityState gwActivityState = sessionActivityStates.get(gwSessionId);
lastActivityTime = Math.max(gwActivityState.getLastActivityTime(), lastActivityTime);
}
}
if (sessionMetaData != null && lastActivityTime < expirationTime) {
// log.info("Session has expired! Session id: [{}]", sessionId);
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}!", sessionId, lastActivityTime);
}
sessions.remove(sessionId);
activityStatesToRemove.add(sessionId);
process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
} else if (activityState.getLastReportedTime() < lastActivityTime) {
long finalLastActivityTime = lastActivityTime;
reportActivityStateToCore(sessionInfo, sessionMetaData, lastActivityTime, new TransportServiceCallback<>() {
@Override
public void onSuccess(Void msgAcknowledged) {
lock.readLock().lock();
try {
updateLastReportedTime(sessionId, finalLastActivityTime);
} finally {
lock.readLock().unlock();
}
}
@Override
public void onError(Throwable e) {
log.debug("[{}] Failed to report last activity time!", sessionId, e);
}
});
}
entry.setValue(activityState.withReportedStatus(false));
}
activityStatesToRemove.forEach(sessionActivityStates::remove);
} finally {
currentPeriodId.incrementAndGet();
// log.info("Period change end! Current period id: [{}], activity states {}", currentPeriodId.get(), sessionActivityStates.keySet());
// log.info("------------------------------------------------------------------------------------------------------------------------");
lock.writeLock().unlock();
}
}
private void reportActivityStateToCore(TransportProtos.SessionInfoProto sessionInfo, long lastActivityTime, TransportServiceCallback<Void> msgAcknowledgedCallback) {
SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo));
reportActivityStateToCore(sessionInfo, sessionMetaData, lastActivityTime, msgAcknowledgedCallback);
}
private void reportActivityStateToCore(
TransportProtos.SessionInfoProto sessionInfo, SessionMetaData sessionMetaData, long lastActivityTime, TransportServiceCallback<Void> msgAcknowledgedCallback
) {
log.info("Reporting activity state to core! Session id: [{}], last activity time: [{}], current period id: [{}]",
toSessionId(sessionInfo), lastActivityTime, currentPeriodId.get());
TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToAttributes())
.setRpcSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToRPC())
.setLastActivityTime(lastActivityTime)
.build();
process(sessionInfo, subscriptionInfo, msgAcknowledgedCallback);
}
private void updateLastReportedTime(UUID sessionId, long lastActivityTime) {
sessionActivityStates.computeIfPresent(sessionId, (__, currentActivityState) -> {
long updatedLastReportedTime = Math.max(currentActivityState.getLastReportedTime(), lastActivityTime);
return currentActivityState.withLastReportedTime(updatedLastReportedTime);
});
}
}
@Override
public void lifecycleEvent(TenantId tenantId, DeviceId deviceId, ComponentLifecycleEvent eventType, boolean success, Throwable error) {