Session Activity reporting is isolated and supports short-lived sessions

This commit is contained in:
Andrii Shvaika 2021-08-19 17:09:22 +03:00 committed by Andrew Shvayka
parent 327607e86d
commit a51c00bd10
13 changed files with 123 additions and 69 deletions

View File

@ -638,20 +638,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
UUID sessionId = getSessionId(sessionInfoProto);
Objects.requireNonNull(sessionId);
SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId,
id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), subscriptionInfo.getLastActivityTime()));
sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
if (subscriptionInfo.getAttributeSubscription()) {
attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
if (subscriptionInfo.getRpcSubscription()) {
rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
SessionInfoMetaData sessionMD = sessions.get(sessionId);
if (sessionMD != null) {
sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
if (subscriptionInfo.getAttributeSubscription()) {
attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
if (subscriptionInfo.getRpcSubscription()) {
rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
}
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime());
dumpSessions();
if (sessionMD != null) {
dumpSessions();
}
}
void processCredentialsUpdate(TbActorMsg msg) {

View File

@ -563,7 +563,7 @@ js:
transport:
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"

View File

@ -115,7 +115,6 @@ public class DeviceApiController implements TbTransportService {
TransportService transportService = transportContext.getTransportService();
transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
new HttpOkCallback(responseWriter));
reportActivity(sessionInfo);
}));
return responseWriter;
}
@ -129,7 +128,6 @@ public class DeviceApiController implements TbTransportService {
TransportService transportService = transportContext.getTransportService();
transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
new HttpOkCallback(responseWriter));
reportActivity(sessionInfo);
}));
return responseWriter;
}
@ -419,14 +417,6 @@ public class DeviceApiController implements TbTransportService {
}
private void reportActivity(SessionInfoProto sessionInfo) {
transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(false)
.setRpcSubscription(false)
.setLastActivityTime(System.currentTimeMillis())
.build(), TransportServiceCallback.EMPTY);
}
private static MediaType parseMediaType(String contentType) {
try {
return MediaType.parseMediaType(contentType);

View File

@ -908,7 +908,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
* @param sessionInfo -
*/
private void reportActivityAndRegister(SessionInfoProto sessionInfo) {
if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) {
if (sessionInfo != null && !transportService.hasSession(sessionInfo)) {
sessionManager.register(sessionInfo);
this.reportActivitySubscription(sessionInfo);
}

View File

@ -126,7 +126,7 @@ public interface TransportService {
SessionMetaData registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout);
SessionMetaData reportActivity(SessionInfoProto sessionInfo);
void reportActivity(SessionInfoProto sessionInfo);
void deregisterSession(SessionInfoProto sessionInfo);
@ -135,4 +135,6 @@ public interface TransportService {
void notifyAboutUplink(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg build, TransportServiceCallback<Void> empty);
ExecutorService getCallbackExecutor();
boolean hasSession(SessionInfoProto sessionInfo);
}

View File

@ -95,10 +95,12 @@ import org.thingsboard.server.queue.util.TbTransportComponent;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -162,6 +164,7 @@ public class DefaultTransportService implements TransportService {
private ExecutorService mainConsumerExecutor;
private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
private final ConcurrentMap<UUID, SessionActivityData> sessionsActivity = new ConcurrentHashMap<>();
private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>();
private volatile boolean stopped = false;
@ -545,8 +548,11 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo));
if (sessionMetaData != null) {
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
}
reportActivityInternal(sessionInfo);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(),
new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
}
@ -555,8 +561,11 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo));
if (sessionMetaData != null) {
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
}
reportActivityInternal(sessionInfo);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(),
new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback));
}
@ -668,52 +677,61 @@ public class DefaultTransportService implements TransportService {
}
@Override
public SessionMetaData reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
return reportActivityInternal(sessionInfo);
public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
reportActivityInternal(sessionInfo);
}
private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
private void reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
UUID sessionId = toSessionId(sessionInfo);
SessionMetaData sessionMetaData = sessions.get(sessionId);
if (sessionMetaData != null) {
sessionMetaData.updateLastActivityTime();
}
return sessionMetaData;
SessionActivityData sessionMetaData = sessionsActivity.computeIfAbsent(sessionId, id -> new SessionActivityData(sessionInfo));
sessionMetaData.updateLastActivityTime();
}
private void checkInactivityAndReportActivity() {
long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
sessions.forEach((uuid, sessionMD) -> {
long lastActivityTime = sessionMD.getLastActivityTime();
TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
if (sessionInfo.getGwSessionIdMSB() != 0 &&
sessionInfo.getGwSessionIdLSB() != 0) {
SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
Set<UUID> sessionsToRemove = new HashSet<>();
sessionsActivity.forEach((uuid, sessionAD) -> {
long lastActivityTime = sessionAD.getLastActivityTime();
SessionMetaData sessionMD = sessions.get(uuid);
if (sessionMD != null) {
sessionAD.setSessionInfo(sessionMD.getSessionInfo());
} else {
sessionsToRemove.add(uuid);
}
TransportProtos.SessionInfoProto sessionInfo = sessionAD.getSessionInfo();
if (sessionInfo.getGwSessionIdMSB() != 0 && sessionInfo.getGwSessionIdLSB() != 0) {
var gwSessionId = new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB());
SessionMetaData gwMetaData = sessions.get(gwSessionId);
SessionActivityData gwActivityData = sessionsActivity.get(gwSessionId);
if (gwMetaData != null && gwMetaData.isOverwriteActivityTime()) {
lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
lastActivityTime = Math.max(gwActivityData.getLastActivityTime(), lastActivityTime);
}
}
if (lastActivityTime < expTime) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
if (sessionMD != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
}
sessions.remove(uuid);
sessionsToRemove.add(uuid);
process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto
.newBuilder()
.setMessage("Session has expired due to last activity time!")
.build();
sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto);
}
process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
sessions.remove(uuid);
TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto
.newBuilder()
.setMessage("session has expired due to last activity time!")
.build();
sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto);
} else {
if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
if (lastActivityTime > sessionAD.getLastReportedActivityTime()) {
final long lastActivityTimeFinal = lastActivityTime;
process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(sessionMD.isSubscribedToAttributes())
.setRpcSubscription(sessionMD.isSubscribedToRPC())
.setAttributeSubscription(sessionMD != null && sessionMD.isSubscribedToAttributes())
.setRpcSubscription(sessionMD != null && sessionMD.isSubscribedToRPC())
.setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void msg) {
sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
sessionAD.setLastReportedActivityTime(lastActivityTimeFinal);
}
@Override
@ -724,6 +742,8 @@ public class DefaultTransportService implements TransportService {
}
}
});
// Removes all closed or short-lived sessions.
sessionsToRemove.forEach(sessionsActivity::remove);
}
@Override
@ -1146,4 +1166,9 @@ public class DefaultTransportService implements TransportService {
public ExecutorService getCallbackExecutor() {
return transportCallbackExecutor;
}
@Override
public boolean hasSession(TransportProtos.SessionInfoProto sessionInfo) {
return sessions.containsKey(toSessionId(sessionInfo));
}
}

View File

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.service;
import lombok.Data;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.concurrent.ScheduledFuture;
/**
* Created by ashvayka on 15.10.18.
*/
@Data
public class SessionActivityData {
private volatile TransportProtos.SessionInfoProto sessionInfo;
private volatile long lastActivityTime;
private volatile long lastReportedActivityTime;
SessionActivityData(TransportProtos.SessionInfoProto sessionInfo) {
this.sessionInfo = sessionInfo;
}
void updateLastActivityTime() {
this.lastActivityTime = System.currentTimeMillis();
}
}

View File

@ -32,8 +32,6 @@ public class SessionMetaData {
private final SessionMsgListener listener;
private volatile ScheduledFuture scheduledFuture;
private volatile long lastActivityTime;
private volatile long lastReportedActivityTime;
private volatile boolean subscribedToAttributes;
private volatile boolean subscribedToRPC;
private volatile boolean overwriteActivityTime;
@ -42,14 +40,9 @@ public class SessionMetaData {
this.sessionInfo = sessionInfo;
this.sessionType = sessionType;
this.listener = listener;
this.lastActivityTime = System.currentTimeMillis();
this.scheduledFuture = null;
}
void updateLastActivityTime() {
this.lastActivityTime = System.currentTimeMillis();
}
void setScheduledFuture(ScheduledFuture scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}

View File

@ -113,7 +113,7 @@ transport:
dtls_session_report_timeout: "${TB_COAP_X509_DTLS_SESSION_REPORT_TIMEOUT:1800000}"
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"

View File

@ -85,7 +85,7 @@ transport:
max_request_timeout: "${HTTP_MAX_REQUEST_TIMEOUT:300000}"
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"

View File

@ -87,7 +87,7 @@ redis:
transport:
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:false}"

View File

@ -118,7 +118,7 @@ transport:
skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}"
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"

View File

@ -50,7 +50,7 @@ transport:
underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}"
sessions:
inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON
type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"