From 8ab4f144b2713b2692f7b260918825f9258b107d Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Mon, 29 Oct 2018 18:31:14 +0200 Subject: [PATCH] Implementation of Session timeouts --- .gitignore | 1 + .../server/actors/ActorSystemContext.java | 13 ++ .../server/actors/device/DeviceActor.java | 17 +- .../device/DeviceActorMessageProcessor.java | 148 +++++++++++++----- .../server/actors/device/SessionInfo.java | 1 + .../actors/device/SessionInfoMetaData.java | 39 +++++ .../actors/device/SessionTimeoutCheckMsg.java | 39 +++++ .../AbstractContextAwareMsgProcessor.java | 24 +-- .../server/actors/tenant/TenantActor.java | 1 - .../DefaultDeviceSessionCacheService.java | 50 ++++++ .../session/DeviceSessionCacheService.java | 30 ++++ .../transport/LocalTransportService.java | 7 + .../src/main/resources/thingsboard.yml | 6 + .../server/common/data/CacheConstants.java | 1 + .../server/common/msg/MsgType.java | 7 +- .../transport/mqtt/MqttTransportHandler.java | 19 +-- .../mqtt/session/GatewaySessionHandler.java | 13 +- .../common/transport/TransportService.java | 4 + .../service/AbstractTransportService.java | 90 ++++++++++- .../service/RemoteTransportService.java | 11 ++ .../transport/service/SessionMetaData.java | 17 +- .../src/main/resources/tb-coap-transport.yml | 3 + .../src/main/resources/tb-http-transport.yml | 3 + .../src/main/resources/tb-mqtt-transport.yml | 4 +- 24 files changed, 463 insertions(+), 85 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java create mode 100644 application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java create mode 100644 application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java diff --git a/.gitignore b/.gitignore index 9039e8ec4d..da8569baa3 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ pom.xml.versionsBackup **/Californium.properties **/.env .instance_id +rebuild-docker.sh diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 5e19d690c4..091b504ea7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -67,6 +67,7 @@ import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.rpc.DeviceRpcService; import org.thingsboard.server.service.script.JsExecutorService; import org.thingsboard.server.service.script.JsInvokeService; +import org.thingsboard.server.service.session.DeviceSessionCacheService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.transport.RuleEngineTransportService; @@ -201,6 +202,10 @@ public class ActorSystemContext { @Getter private DeviceStateService deviceStateService; + @Autowired + @Getter + private DeviceSessionCacheService deviceSessionCacheService; + @Lazy @Autowired @Getter @@ -254,6 +259,14 @@ public class ActorSystemContext { @Getter private boolean allowSystemMailService; + @Value("${transport.sessions.inactivity_timeout}") + @Getter + private long sessionInactivityTimeout; + + @Value("${transport.sessions.report_timeout}") + @Getter + private long sessionReportTimeout; + @Getter @Setter private ActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index bd2a0f4486..7b412b1328 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -43,12 +43,20 @@ public class DeviceActor extends ContextAwareActor { this.processor = new DeviceActorMessageProcessor(systemContext, logger, tenantId, deviceId); } + @Override + public void preStart() { + logger.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId); + try { + processor.initSessionTimeout(context()); + logger.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId); + } catch (Exception e) { + logger.error(e, "[{}][{}] Unknown failure", processor.tenantId, processor.deviceId); + } + } + @Override protected boolean process(TbActorMsg msg) { switch (msg.getMsgType()) { - case CLUSTER_EVENT_MSG: - processor.processClusterEventMsg((ClusterEventMsg) msg); - break; case TRANSPORT_TO_DEVICE_ACTOR_MSG: processor.process(context(), (TransportToDeviceActorMsgWrapper) msg); break; @@ -73,6 +81,9 @@ public class DeviceActor extends ContextAwareActor { case DEVICE_ACTOR_CLIENT_SIDE_RPC_TIMEOUT_MSG: processor.processClientSideRpcTimeout(context(), (DeviceActorClientSideRpcTimeoutMsg) msg); break; + case SESSION_TIMEOUT_MSG: + processor.checkSessionsTimeout(); + break; default: return false; } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 021f26207a..08a4454809 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -88,11 +88,11 @@ import java.util.stream.Collectors; /** * @author Andrew Shvayka */ -public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { +class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { - private final TenantId tenantId; - private final DeviceId deviceId; - private final Map sessions; + final TenantId tenantId; + final DeviceId deviceId; + private final Map sessions; private final Map attributeSubscriptions; private final Map rpcSubscriptions; private final Map toDeviceRpcPendingMap; @@ -116,6 +116,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.toDeviceRpcPendingMap = new HashMap<>(); this.toServerRpcPendingMap = new HashMap<>(); initAttributes(); + restoreSessions(); } private void initAttributes() { @@ -160,7 +161,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } else { logger.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId()); } - } private void registerPendingRpcRequest(ActorContext context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) { @@ -174,7 +174,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (requestMd != null) { logger.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId()); systemContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), - null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); + null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION)); } } @@ -227,11 +227,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } if (msg.hasPostAttributes()) { handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes()); - reportActivity(); + reportLogicalDeviceActivity(); } if (msg.hasPostTelemetry()) { handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry()); - reportActivity(); + reportLogicalDeviceActivity(); } if (msg.hasGetAttributes()) { handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); @@ -241,11 +241,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } if (msg.hasToServerRPCCallRequest()) { handleClientSideRPCRequest(context, msg.getSessionInfo(), msg.getToServerRPCCallRequest()); - reportActivity(); + reportLogicalDeviceActivity(); + } + if (msg.hasSubscriptionInfo()) { + handleSessionActivity(context, msg.getSessionInfo(), msg.getSubscriptionInfo()); } } - private void reportActivity() { + private void reportLogicalDeviceActivity() { systemContext.getDeviceStateService().onDeviceActivity(deviceId); } @@ -406,28 +409,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - void processClusterEventMsg(ClusterEventMsg msg) { -// if (!msg.isAdded()) { -// logger.debug("[{}] Clearing attributes/rpc subscription for server [{}]", deviceId, msg.getServerAddress()); -// Predicate> filter = e -> e.getValue().getServer() -// .map(serverAddress -> serverAddress.equals(msg.getServerAddress())).orElse(false); -// attributeSubscriptions.entrySet().removeIf(filter); -// rpcSubscriptions.entrySet().removeIf(filter); -// } - } - private void processSubscriptionCommands(ActorContext context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) { UUID sessionId = getSessionId(sessionInfo); if (subscribeCmd.getUnsubscribe()) { logger.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId); attributeSubscriptions.remove(sessionId); } else { - SessionInfo session = sessions.get(sessionId); - if (session == null) { - session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()); + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD == null) { + sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId())); } + sessionMD.setSubscribedToAttributes(true); logger.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId); - attributeSubscriptions.put(sessionId, session); + attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo()); + dumpSessions(); } } @@ -441,20 +436,22 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso logger.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId); rpcSubscriptions.remove(sessionId); } else { - SessionInfo session = sessions.get(sessionId); - if (session == null) { - session = new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId()); + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD == null) { + sessionMD = new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.SYNC, sessionInfo.getNodeId())); } + sessionMD.setSubscribedToRPC(true); logger.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId); - rpcSubscriptions.put(sessionId, session); + rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo()); sendPendingRequests(context, sessionId, sessionInfo); + dumpSessions(); } } private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { UUID sessionId = getSessionId(sessionInfo); if (msg.getEvent() == SessionEvent.OPEN) { - if(sessions.containsKey(sessionId)){ + if (sessions.containsKey(sessionId)) { logger.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId); return; } @@ -462,13 +459,14 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); if (sessionIdToRemove != null) { - closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); + notifyTransportAboutClosedSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); } } - sessions.put(sessionId, new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId())); + sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfo.getNodeId()))); if (sessions.size() == 1) { reportSessionOpen(); } + dumpSessions(); } else if (msg.getEvent() == SessionEvent.CLOSED) { logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); sessions.remove(sessionId); @@ -477,21 +475,34 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (sessions.isEmpty()) { reportSessionClose(); } + dumpSessions(); } } - void processCredentialsUpdate() { - sessions.forEach(this::closeSession); - attributeSubscriptions.clear(); - rpcSubscriptions.clear(); + private void handleSessionActivity(ActorContext context, SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto subscriptionInfo) { + UUID sessionId = getSessionId(sessionInfo); + SessionInfoMetaData sessionMD = sessions.get(sessionId); + if (sessionMD != null) { + sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime()); + sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription()); + sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription()); + } + dumpSessions(); } - private void closeSession(UUID sessionId, SessionInfo sessionInfo) { + void processCredentialsUpdate() { + sessions.forEach(this::notifyTransportAboutClosedSession); + attributeSubscriptions.clear(); + rpcSubscriptions.clear(); + dumpSessions(); + } + + private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd) { DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build(); - systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); + systemContext.getRuleEngineTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg); } void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) { @@ -605,4 +616,67 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } return builder.build(); } + + private void restoreSessions() { + TransportProtos.DeviceSessionsCacheEntry sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId); + if (sessionsDump.getSerializedSize() == 0) { + return; + } + for (TransportProtos.SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) { + SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo(); + UUID sessionId = getSessionId(sessionInfoProto); + SessionInfo sessionInfo = new SessionInfo(TransportProtos.SessionType.ASYNC, sessionInfoProto.getNodeId()); + TransportProtos.SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo(); + SessionInfoMetaData sessionInfoMetaData = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime()); + sessions.put(sessionId, sessionInfoMetaData); + if (subInfo.getAttributeSubscription()) { + rpcSubscriptions.put(sessionId, sessionInfo); + } + if (subInfo.getAttributeSubscription()) { + attributeSubscriptions.put(sessionId, sessionInfo); + } + } + } + + private void dumpSessions() { + List sessionsList = new ArrayList<>(sessions.size()); + sessions.forEach((uuid, sessionMD) -> { + if (sessionMD.getSessionInfo().getType() == TransportProtos.SessionType.SYNC) { + return; + } + SessionInfo sessionInfo = sessionMD.getSessionInfo(); + TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder() + .setLastActivityTime(sessionMD.getLastActivityTime()) + .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) + .setRpcSubscription(sessionMD.isSubscribedToRPC()).build(); + TransportProtos.SessionInfoProto sessionInfoProto = TransportProtos.SessionInfoProto.newBuilder() + .setSessionIdMSB(uuid.getMostSignificantBits()) + .setSessionIdLSB(uuid.getLeastSignificantBits()) + .setNodeId(sessionInfo.getNodeId()).build(); + sessionsList.add(TransportProtos.SessionSubscriptionInfoProto.newBuilder() + .setSessionInfo(sessionInfoProto) + .setSubscriptionInfo(subscriptionInfoProto).build()); + }); + systemContext.getDeviceSessionCacheService() + .put(deviceId, TransportProtos.DeviceSessionsCacheEntry.newBuilder() + .addAllSessions(sessionsList).build()); + } + + void initSessionTimeout(ActorContext context) { + schedulePeriodicMsgWithDelay(context, SessionTimeoutCheckMsg.instance(), systemContext.getSessionInactivityTimeout(), systemContext.getSessionInactivityTimeout()); + } + + void checkSessionsTimeout() { + long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout(); + Map sessionsToRemove = sessions.entrySet().stream().filter(kv -> kv.getValue().getLastActivityTime() < expTime).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + sessionsToRemove.forEach((sessionId, sessionMD) -> { + sessions.remove(sessionId); + rpcSubscriptions.remove(sessionId); + attributeSubscriptions.remove(sessionId); + notifyTransportAboutClosedSession(sessionId, sessionMD); + }); + if (!sessionsToRemove.isEmpty()) { + dumpSessions(); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java index 43ae592cee..fe09077af1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java @@ -25,4 +25,5 @@ import org.thingsboard.server.gen.transport.TransportProtos.SessionType; public class SessionInfo { private final SessionType type; private final String nodeId; + private long lastActivityTime; } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java new file mode 100644 index 0000000000..dd0839486f --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfoMetaData.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.device; + +import lombok.Data; +import org.thingsboard.server.gen.transport.TransportProtos.SessionType; + +/** + * @author Andrew Shvayka + */ +@Data +class SessionInfoMetaData { + private final SessionInfo sessionInfo; + private long lastActivityTime; + private boolean subscribedToAttributes; + private boolean subscribedToRPC; + + SessionInfoMetaData(SessionInfo sessionInfo) { + this(sessionInfo, System.currentTimeMillis()); + } + + SessionInfoMetaData(SessionInfo sessionInfo, long lastActivityTime) { + this.sessionInfo = sessionInfo; + this.lastActivityTime = lastActivityTime; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java new file mode 100644 index 0000000000..d9172ae825 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionTimeoutCheckMsg.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.device; + +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorMsg; + +/** + * Created by ashvayka on 29.10.18. + */ +public class SessionTimeoutCheckMsg implements TbActorMsg { + + private static final SessionTimeoutCheckMsg INSTANCE = new SessionTimeoutCheckMsg(); + + private SessionTimeoutCheckMsg() { + } + + public static SessionTimeoutCheckMsg instance() { + return INSTANCE; + } + + @Override + public MsgType getMsgType() { + return MsgType.SESSION_TIMEOUT_MSG; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java index 8864486150..c8097821d9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/shared/AbstractContextAwareMsgProcessor.java @@ -40,43 +40,31 @@ public abstract class AbstractContextAwareMsgProcessor { this.logger = logger; } - protected ActorRef getAppActor() { - return systemContext.getAppActor(); - } - - protected Scheduler getScheduler() { + private Scheduler getScheduler() { return systemContext.getScheduler(); } - protected ExecutionContextExecutor getSystemDispatcher() { + private ExecutionContextExecutor getSystemDispatcher() { return systemContext.getActorSystem().dispatcher(); } protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs) { - schedulePeriodicMsgWithDelay(ctx, msg, delayInMs, periodInMs, ctx.self()); + schedulePeriodicMsgWithDelay(msg, delayInMs, periodInMs, ctx.self()); } - protected void schedulePeriodicMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, long periodInMs, ActorRef target) { + private void schedulePeriodicMsgWithDelay(Object msg, long delayInMs, long periodInMs, ActorRef target) { logger.debug("Scheduling periodic msg {} every {} ms with delay {} ms", msg, periodInMs, delayInMs); getScheduler().schedule(Duration.create(delayInMs, TimeUnit.MILLISECONDS), Duration.create(periodInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); } - protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs) { - scheduleMsgWithDelay(ctx, msg, delayInMs, ctx.self()); + scheduleMsgWithDelay(msg, delayInMs, ctx.self()); } - protected void scheduleMsgWithDelay(ActorContext ctx, Object msg, long delayInMs, ActorRef target) { + private void scheduleMsgWithDelay(Object msg, long delayInMs, ActorRef target) { logger.debug("Scheduling msg {} with delay {} ms", msg, delayInMs); getScheduler().scheduleOnce(Duration.create(delayInMs, TimeUnit.MILLISECONDS), target, msg, getSystemDispatcher(), null); } - @Data - @AllArgsConstructor - private static class ComponentConfiguration { - private final String clazz; - private final String name; - private final String configuration; - } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 347483a185..f822e4e1f0 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -127,7 +127,6 @@ public class TenantActor extends RuleChainManagerActor { ruleChainManager.getOrCreateActor(context(), msg.getRuleChainId()).tell(msg, self()); } - private void onToDeviceActorMsg(DeviceAwareMsg msg) { getOrCreateDeviceActor(msg.getDeviceId()).tell(msg, ActorRef.noSender()); } diff --git a/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java new file mode 100644 index 0000000000..6201dabdda --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/session/DefaultDeviceSessionCacheService.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.session; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.cache.annotation.CachePut; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; + +import java.util.ArrayList; +import java.util.Collections; + +import static org.thingsboard.server.common.data.CacheConstants.SESSIONS_CACHE; + +/** + * Created by ashvayka on 29.10.18. + */ +@Service +@Slf4j +public class DefaultDeviceSessionCacheService implements DeviceSessionCacheService { + + @Override + @Cacheable(cacheNames = SESSIONS_CACHE, key = "#deviceId") + public DeviceSessionsCacheEntry get(DeviceId deviceId) { + log.debug("[{}] Fetching session data from cache", deviceId); + return DeviceSessionsCacheEntry.newBuilder().addAllSessions(Collections.emptyList()).build(); + } + + @Override + @CachePut(cacheNames = SESSIONS_CACHE, key = "#deviceId") + public DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions) { + log.debug("[{}] Pushing session data from cache: {}", deviceId, sessions); + return sessions; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java new file mode 100644 index 0000000000..0a2e6a52e5 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/session/DeviceSessionCacheService.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.session; + +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry; + +/** + * Created by ashvayka on 29.10.18. + */ +public interface DeviceSessionCacheService { + + DeviceSessionsCacheEntry get(DeviceId deviceId); + + DeviceSessionsCacheEntry put(DeviceId deviceId, DeviceSessionsCacheEntry sessions); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java index 0f722306ff..d3782e8ffc 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java @@ -160,6 +160,13 @@ public class LocalTransportService extends AbstractTransportService implements R } } + @Override + public void process(SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback callback) { + if (checkLimits(sessionInfo, callback)) { + forwardToDeviceActor(TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback); + } + } + @Override public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, callback)) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 54096eaaa1..58242933aa 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -202,6 +202,9 @@ caffeine: devices: timeToLiveInMinutes: 1440 maxSize: 100000 + sessions: + timeToLiveInMinutes: 1440 + maxSize: 100000 assets: timeToLiveInMinutes: 1440 maxSize: 100000 @@ -392,6 +395,9 @@ transport: auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}" notifications: topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" + sessions: + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" rate_limits: enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index 698a69ef6b..853caff8d5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -19,6 +19,7 @@ public class CacheConstants { public static final String DEVICE_CREDENTIALS_CACHE = "deviceCredentials"; public static final String RELATIONS_CACHE = "relations"; public static final String DEVICE_CACHE = "devices"; + public static final String SESSIONS_CACHE = "sessions"; public static final String ASSET_CACHE = "assets"; public static final String ENTITY_VIEW_CACHE = "entityViews"; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 24758b5b36..44a98d6ee4 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -96,13 +96,8 @@ public enum MsgType { */ DEVICE_ACTOR_TO_RULE_ENGINE_MSG, - /** - * Message that is sent from Rule Engine to the Device Actor when message is successfully pushed to queue. - */ - ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG, - TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, - SESSION_CTRL_MSG, + STATS_PERSIST_TICK_MSG, diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index da8b3a62ca..a178bdf891 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -43,10 +43,10 @@ import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.msg.EncryptionUtil; +import org.thingsboard.server.common.transport.service.AbstractTransportService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; -import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; @@ -141,9 +141,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case PINGREQ: - //TODO: should we push the notification to the rule engine? if (checkConnected(ctx)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); + transportService.reportActivity(sessionInfo); + if (gatewaySessionHandler != null) { + gatewaySessionHandler.reportActivity(); + } } break; case DISCONNECT: @@ -394,7 +397,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processDisconnect(ChannelHandlerContext ctx) { ctx.close(); if (deviceSessionCtx.isConnected()) { - transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); + transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); transportService.deregisterSession(sessionInfo); if (gatewaySessionHandler != null) { gatewaySessionHandler.onGatewayDisconnect(); @@ -466,16 +469,10 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - public static SessionEventMsg getSessionEventMsg(SessionEvent event) { - return SessionEventMsg.newBuilder() - .setSessionType(TransportProtos.SessionType.ASYNC) - .setEvent(event).build(); - } - @Override public void operationComplete(Future future) throws Exception { if (deviceSessionCtx.isConnected()) { - transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); + transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); transportService.deregisterSession(sessionInfo); } } @@ -495,7 +492,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB()) .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB()) .build(); - transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null); + transportService.process(sessionInfo, AbstractTransportService.getSessionEventMsg(SessionEvent.OPEN), null); transportService.registerAsyncSession(sessionInfo, this); checkGatewaySession(); ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index d6000591a8..ac33ba677e 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.transport.service.AbstractTransportService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; @@ -118,7 +119,7 @@ public class GatewaySessionHandler { GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), mqttQoSMap); if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) { SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); - transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); + transportService.process(deviceSessionInfo, AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); @@ -334,7 +335,7 @@ public class GatewaySessionHandler { private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) { transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); - transportService.process(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + transportService.process(deviceSessionCtx.getSessionInfo(), AbstractTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); } @@ -360,11 +361,15 @@ public class GatewaySessionHandler { return context; } - public MqttTransportAdaptor getAdaptor() { + MqttTransportAdaptor getAdaptor() { return context.getAdaptor(); } - public int nextMsgId() { + int nextMsgId() { return deviceSessionCtx.nextMsgId(); } + + public void reportActivity() { + devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo())); + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index a47438f30d..8944e94a86 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -61,10 +61,14 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); + void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback callback); + void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener); void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout); + void reportActivity(SessionInfoProto sessionInfo); + void deregisterSession(SessionInfoProto sessionInfo); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index 265dacb5de..3bb84ecbc4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -47,9 +47,14 @@ public abstract class AbstractTransportService implements TransportService { private String perTenantLimitsConf; @Value("${transport.rate_limits.tenant}") private String perDevicesLimitsConf; + @Value("${transport.sessions.inactivity_timeout}") + private long sessionInactivityTimeout; + @Value("${transport.sessions.report_timeout}") + private long sessionReportTimeout; protected ScheduledExecutorService schedulerExecutor; protected ExecutorService transportCallbackExecutor; + private ConcurrentMap sessions = new ConcurrentHashMap<>(); //TODO: Implement cleanup of this maps. @@ -59,7 +64,81 @@ public abstract class AbstractTransportService implements TransportService { @Override public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); - //TODO: monitor sessions periodically: PING REQ/RESP, etc. + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback callback) { + reportActivityInternal(sessionInfo); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback) { + reportActivityInternal(sessionInfo); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { + reportActivityInternal(sessionInfo); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback callback) { + reportActivityInternal(sessionInfo); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); + sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback callback) { + SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); + sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { + reportActivityInternal(sessionInfo); + } + + @Override + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback) { + reportActivityInternal(sessionInfo); + } + + @Override + public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { + reportActivityInternal(sessionInfo); + } + + private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { + UUID sessionId = toId(sessionInfo); + SessionMetaData sessionMetaData = sessions.get(sessionId); + if (sessionMetaData != null) { + sessionMetaData.updateLastActivityTime(); + } + return sessionMetaData; + } + + private void checkInactivityAndReportActivity() { + long expTime = System.currentTimeMillis() - sessionInactivityTimeout; + sessions.forEach((uuid, sessionMD) -> { + if (sessionMD.getLastActivityTime() < expTime) { + if (log.isDebugEnabled()) { + log.debug("[{}] Session has expired due to last activity time: {}", toId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime()); + } + process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + sessions.remove(uuid); + sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); + } else { + process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) + .setRpcSubscription(sessionMD.isSubscribedToRPC()) + .setLastActivityTime(sessionMD.getLastActivityTime()).build(), null); + } + }); } @Override @@ -131,7 +210,7 @@ public abstract class AbstractTransportService implements TransportService { } } - protected UUID toId(TransportProtos.SessionInfoProto sessionInfo) { + private UUID toId(TransportProtos.SessionInfoProto sessionInfo) { return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); } @@ -147,6 +226,7 @@ public abstract class AbstractTransportService implements TransportService { } this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS); } public void destroy() { @@ -161,4 +241,10 @@ public abstract class AbstractTransportService implements TransportService { transportCallbackExecutor.shutdownNow(); } } + + public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { + return TransportProtos.SessionEventMsg.newBuilder() + .setSessionType(TransportProtos.SessionType.ASYNC) + .setEvent(event).build(); + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java index 6774942de3..091096106b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/RemoteTransportService.java @@ -227,6 +227,17 @@ public class RemoteTransportService extends AbstractTransportService { } } + @Override + public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback) { + if (checkLimits(sessionInfo, callback)) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscriptionInfo(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + } + @Override public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, callback)) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java index 1de57114e4..8642e93a97 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java @@ -23,10 +23,25 @@ import org.thingsboard.server.gen.transport.TransportProtos; * Created by ashvayka on 15.10.18. */ @Data -public class SessionMetaData { +class SessionMetaData { private final TransportProtos.SessionInfoProto sessionInfo; private final TransportProtos.SessionType sessionType; private final SessionMsgListener listener; + private volatile long lastActivityTime; + private volatile boolean subscribedToAttributes; + private volatile boolean subscribedToRPC; + + SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) { + this.sessionInfo = sessionInfo; + this.sessionType = sessionType; + this.listener = listener; + this.lastActivityTime = System.currentTimeMillis(); + } + + void updateLastActivityTime() { + this.lastActivityTime = System.currentTimeMillis(); + } + } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 30351b0bfa..f96bcbe6d6 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -23,6 +23,9 @@ transport: bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" timeout: "${COAP_TIMEOUT:10000}" + sessions: + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" rate_limits: enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 001e08abdc..6d593ed1b4 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -24,6 +24,9 @@ server: transport: http: request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}" + sessions: + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" rate_limits: enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index e37d14e874..e7f8942e77 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -44,8 +44,8 @@ transport: # Type of the key store key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" sessions: - max_per_tenant: "${TB_TRANSPORT_SESSIONS_MAX_PER_TENANT:1000}" - max_per_device: "${TB_TRANSPORT_SESSIONS_MAX_PER_DEVICE:2}" + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" + report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" rate_limits: enabled: "${TB_TRANSPORT_RATE_LIMITS_ENABLED:false}" tenant: "${TB_TRANSPORT_RATE_LIMITS_TENANT:1000:1,20000:60}"