diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index adbd99a545..6e3f7b8233 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -638,20 +638,22 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { UUID sessionId = getSessionId(sessionInfoProto); Objects.requireNonNull(sessionId); - SessionInfoMetaData sessionMD = sessions.computeIfAbsent(sessionId, - id -> new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId()), subscriptionInfo.getLastActivityTime())); - - sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime()); - sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription()); - sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription()); - if (subscriptionInfo.getAttributeSubscription()) { - attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo()); - } - if (subscriptionInfo.getRpcSubscription()) { - rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo()); + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD != null) { + sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime()); + sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription()); + sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription()); + if (subscriptionInfo.getAttributeSubscription()) { + attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo()); + } + if (subscriptionInfo.getRpcSubscription()) { + rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo()); + } } systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime()); - dumpSessions(); + if (sessionMD != null) { + dumpSessions(); + } } void processCredentialsUpdate(TbActorMsg msg) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 74572e8ccc..ced335aea4 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -563,7 +563,7 @@ js: transport: sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index aab76e350c..cfb48656ed 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -115,7 +115,6 @@ public class DeviceApiController implements TbTransportService { TransportService transportService = transportContext.getTransportService(); transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)), new HttpOkCallback(responseWriter)); - reportActivity(sessionInfo); })); return responseWriter; } @@ -129,7 +128,6 @@ public class DeviceApiController implements TbTransportService { TransportService transportService = transportContext.getTransportService(); transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)), new HttpOkCallback(responseWriter)); - reportActivity(sessionInfo); })); return responseWriter; } @@ -419,14 +417,6 @@ public class DeviceApiController implements TbTransportService { } - private void reportActivity(SessionInfoProto sessionInfo) { - transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() - .setAttributeSubscription(false) - .setRpcSubscription(false) - .setLastActivityTime(System.currentTimeMillis()) - .build(), TransportServiceCallback.EMPTY); - } - private static MediaType parseMediaType(String contentType) { try { return MediaType.parseMediaType(contentType); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 58201f344e..daa1152728 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -908,7 +908,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl * @param sessionInfo - */ private void reportActivityAndRegister(SessionInfoProto sessionInfo) { - if (sessionInfo != null && transportService.reportActivity(sessionInfo) == null) { + if (sessionInfo != null && !transportService.hasSession(sessionInfo)) { sessionManager.register(sessionInfo); this.reportActivitySubscription(sessionInfo); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 237954c553..91f3c2ef02 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -126,7 +126,7 @@ public interface TransportService { SessionMetaData registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout); - SessionMetaData reportActivity(SessionInfoProto sessionInfo); + void reportActivity(SessionInfoProto sessionInfo); void deregisterSession(SessionInfoProto sessionInfo); @@ -135,4 +135,6 @@ public interface TransportService { void notifyAboutUplink(SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg build, TransportServiceCallback empty); ExecutorService getCallbackExecutor(); + + boolean hasSession(SessionInfoProto sessionInfo); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index c705d8364a..16317b51ea 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -95,10 +95,12 @@ import org.thingsboard.server.queue.util.TbTransportComponent; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -162,6 +164,7 @@ public class DefaultTransportService implements TransportService { private ExecutorService mainConsumerExecutor; private final ConcurrentMap sessions = new ConcurrentHashMap<>(); + private final ConcurrentMap sessionsActivity = new ConcurrentHashMap<>(); private final Map toServerRpcPendingMap = new ConcurrentHashMap<>(); private volatile boolean stopped = false; @@ -545,8 +548,11 @@ public class DefaultTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); - sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); + SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo)); + if (sessionMetaData != null) { + sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); + } + reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } @@ -555,8 +561,11 @@ public class DefaultTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { - SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); - sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); + SessionMetaData sessionMetaData = sessions.get(toSessionId(sessionInfo)); + if (sessionMetaData != null) { + sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); + } + reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } @@ -668,52 +677,61 @@ public class DefaultTransportService implements TransportService { } @Override - public SessionMetaData reportActivity(TransportProtos.SessionInfoProto sessionInfo) { - return reportActivityInternal(sessionInfo); + public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { + reportActivityInternal(sessionInfo); } - private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { + private void reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { UUID sessionId = toSessionId(sessionInfo); - SessionMetaData sessionMetaData = sessions.get(sessionId); - if (sessionMetaData != null) { - sessionMetaData.updateLastActivityTime(); - } - return sessionMetaData; + SessionActivityData sessionMetaData = sessionsActivity.computeIfAbsent(sessionId, id -> new SessionActivityData(sessionInfo)); + sessionMetaData.updateLastActivityTime(); } private void checkInactivityAndReportActivity() { long expTime = System.currentTimeMillis() - sessionInactivityTimeout; - sessions.forEach((uuid, sessionMD) -> { - long lastActivityTime = sessionMD.getLastActivityTime(); - TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo(); - if (sessionInfo.getGwSessionIdMSB() != 0 && - sessionInfo.getGwSessionIdLSB() != 0) { - SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB())); + Set sessionsToRemove = new HashSet<>(); + sessionsActivity.forEach((uuid, sessionAD) -> { + long lastActivityTime = sessionAD.getLastActivityTime(); + SessionMetaData sessionMD = sessions.get(uuid); + if (sessionMD != null) { + sessionAD.setSessionInfo(sessionMD.getSessionInfo()); + } else { + sessionsToRemove.add(uuid); + } + TransportProtos.SessionInfoProto sessionInfo = sessionAD.getSessionInfo(); + + if (sessionInfo.getGwSessionIdMSB() != 0 && sessionInfo.getGwSessionIdLSB() != 0) { + var gwSessionId = new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()); + SessionMetaData gwMetaData = sessions.get(gwSessionId); + SessionActivityData gwActivityData = sessionsActivity.get(gwSessionId); if (gwMetaData != null && gwMetaData.isOverwriteActivityTime()) { - lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime); + lastActivityTime = Math.max(gwActivityData.getLastActivityTime(), lastActivityTime); } } if (lastActivityTime < expTime) { - if (log.isDebugEnabled()) { - log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime); + if (sessionMD != null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime); + } + sessions.remove(uuid); + sessionsToRemove.add(uuid); + process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto + .newBuilder() + .setMessage("Session has expired due to last activity time!") + .build(); + sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto); } - process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); - sessions.remove(uuid); - TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto - .newBuilder() - .setMessage("session has expired due to last activity time!") - .build(); - sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto); } else { - if (lastActivityTime > sessionMD.getLastReportedActivityTime()) { + if (lastActivityTime > sessionAD.getLastReportedActivityTime()) { final long lastActivityTimeFinal = lastActivityTime; process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() - .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) - .setRpcSubscription(sessionMD.isSubscribedToRPC()) + .setAttributeSubscription(sessionMD != null && sessionMD.isSubscribedToAttributes()) + .setRpcSubscription(sessionMD != null && sessionMD.isSubscribedToRPC()) .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback() { @Override public void onSuccess(Void msg) { - sessionMD.setLastReportedActivityTime(lastActivityTimeFinal); + sessionAD.setLastReportedActivityTime(lastActivityTimeFinal); } @Override @@ -724,6 +742,8 @@ public class DefaultTransportService implements TransportService { } } }); + // Removes all closed or short-lived sessions. + sessionsToRemove.forEach(sessionsActivity::remove); } @Override @@ -1146,4 +1166,9 @@ public class DefaultTransportService implements TransportService { public ExecutorService getCallbackExecutor() { return transportCallbackExecutor; } + + @Override + public boolean hasSession(TransportProtos.SessionInfoProto sessionInfo) { + return sessions.containsKey(toSessionId(sessionInfo)); + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionActivityData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionActivityData.java new file mode 100644 index 0000000000..fac0e3ad52 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionActivityData.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.service; + +import lombok.Data; +import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.concurrent.ScheduledFuture; + +/** + * Created by ashvayka on 15.10.18. + */ +@Data +public class SessionActivityData { + + private volatile TransportProtos.SessionInfoProto sessionInfo; + private volatile long lastActivityTime; + private volatile long lastReportedActivityTime; + + SessionActivityData(TransportProtos.SessionInfoProto sessionInfo) { + this.sessionInfo = sessionInfo; + } + + void updateLastActivityTime() { + this.lastActivityTime = System.currentTimeMillis(); + } + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java index bb0ed7aa58..f56f7161c5 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java @@ -32,8 +32,6 @@ public class SessionMetaData { private final SessionMsgListener listener; private volatile ScheduledFuture scheduledFuture; - private volatile long lastActivityTime; - private volatile long lastReportedActivityTime; private volatile boolean subscribedToAttributes; private volatile boolean subscribedToRPC; private volatile boolean overwriteActivityTime; @@ -42,14 +40,9 @@ public class SessionMetaData { this.sessionInfo = sessionInfo; this.sessionType = sessionType; this.listener = listener; - this.lastActivityTime = System.currentTimeMillis(); this.scheduledFuture = null; } - void updateLastActivityTime() { - this.lastActivityTime = System.currentTimeMillis(); - } - void setScheduledFuture(ScheduledFuture scheduledFuture) { this.scheduledFuture = scheduledFuture; } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index c53dca9e3f..0f66f28efb 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -113,7 +113,7 @@ transport: dtls_session_report_timeout: "${TB_COAP_X509_DTLS_SESSION_REPORT_TIMEOUT:1800000}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index b34608d751..8840cc8e8b 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -85,7 +85,7 @@ transport: max_request_timeout: "${HTTP_MAX_REQUEST_TIMEOUT:300000}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 5be80bd73e..d398b676dd 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -87,7 +87,7 @@ redis: transport: sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:false}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 82f59f0ce3..2191508636 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -118,7 +118,7 @@ transport: skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 7de8b4021b..3453a7ace1 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -50,7 +50,7 @@ transport: underlying_protocol: "${SNMP_UNDERLYING_PROTOCOL:udp}" sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" - report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" json: # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON type_cast_enabled: "${JSON_TYPE_CAST_ENABLED:true}"