From eb3cc332d187464bbe296ef5ee8ebb81ca5be85e Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 28 Apr 2020 18:19:30 +0300 Subject: [PATCH] Fix gateway/device last activity time checks --- common/queue/src/main/proto/queue.proto | 2 ++ .../transport/mqtt/MqttTransportHandler.java | 33 +++++++++---------- .../mqtt/session/GatewayDeviceSessionCtx.java | 2 ++ .../mqtt/session/GatewaySessionHandler.java | 4 +-- .../service/DefaultTransportService.java | 28 +++++++++++----- pom.xml | 1 + 6 files changed, 42 insertions(+), 28 deletions(-) diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 51bc7649be..d3e850b39c 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -49,6 +49,8 @@ message SessionInfoProto { int64 deviceIdLSB = 7; string deviceName = 8; string deviceType = 9; + int64 gwSessionIdMSB = 10; + int64 gwSessionIdLSB = 11; } enum SessionEvent { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index d0f9c489cc..351be0865f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private volatile DeviceSessionCtx deviceSessionCtx; private volatile GatewaySessionHandler gatewaySessionHandler; - MqttTransportHandler(MqttTransportContext context,SslHandler sslHandler) { + MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) { this.sessionId = UUID.randomUUID(); this.context = context; this.transportService = context.getTransportService(); @@ -138,32 +138,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement break; case PUBLISH: processPublish(ctx, (MqttPublishMessage) msg); - transportService.reportActivity(sessionInfo); - if (gatewaySessionHandler != null) { - gatewaySessionHandler.reportActivity(); - } break; case SUBSCRIBE: processSubscribe(ctx, (MqttSubscribeMessage) msg); - transportService.reportActivity(sessionInfo); - if (gatewaySessionHandler != null) { - gatewaySessionHandler.reportActivity(); - } break; case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); - transportService.reportActivity(sessionInfo); - if (gatewaySessionHandler != null) { - gatewaySessionHandler.reportActivity(); - } break; case PINGREQ: if (checkConnected(ctx, msg)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); transportService.reportActivity(sessionInfo); - if (gatewaySessionHandler != null) { - gatewaySessionHandler.reportActivity(); - } } break; case DISCONNECT: @@ -174,7 +159,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement default: break; } - } private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { @@ -188,6 +172,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { if (gatewaySessionHandler != null) { handleGatewayPublishMsg(topicName, msgId, mqttMsg); + transportService.reportActivity(sessionInfo); } } else { processDevicePublish(ctx, mqttMsg, topicName, msgId); @@ -244,6 +229,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); transportService.process(sessionInfo, claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); + } else { + transportService.reportActivity(sessionInfo); } } catch (AdaptorException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); @@ -276,6 +263,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); List grantedQoSList = new ArrayList<>(); + boolean activityReported = false; for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); @@ -284,11 +272,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); registerSubQoS(topic, grantedQoSList, reqQoS); + activityReported = true; break; } case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); registerSubQoS(topic, grantedQoSList, reqQoS); + activityReported = true; break; } case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC: @@ -308,6 +298,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement grantedQoSList.add(FAILURE.value()); } } + if (!activityReported) { + transportService.reportActivity(sessionInfo); + } ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); } @@ -320,6 +313,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (!checkConnected(ctx, mqttMsg)) { return; } + boolean activityReported = false; log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); for (String topicName : mqttMsg.payload().topics()) { mqttQoSMap.remove(new MqttTopicMatcher(topicName)); @@ -327,10 +321,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement switch (topicName) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: { transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null); + activityReported = true; break; } case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: { transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null); + activityReported = true; break; } } @@ -338,6 +334,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); } } + if (!activityReported) { + transportService.reportActivity(sessionInfo); + } ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId())); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java index b4b5f98238..c137da0b82 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewayDeviceSessionCtx.java @@ -46,6 +46,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple .setTenantIdLSB(deviceInfo.getTenantIdLSB()) .setDeviceName(deviceInfo.getDeviceName()) .setDeviceType(deviceInfo.getDeviceType()) + .setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits()) + .setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits()) .build(); setDeviceInfo(deviceInfo); } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index de5f82f324..65393d6939 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -410,7 +410,7 @@ public class GatewaySessionHandler { return deviceSessionCtx.nextMsgId(); } - public void reportActivity() { - devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo())); + public UUID getSessionId() { + return sessionId; } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 3db7300bc3..14f01a6d63 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -379,6 +379,7 @@ public class DefaultTransportService implements TransportService { @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { + reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) .setClaimDevice(msg).build(), callback); } @@ -401,23 +402,32 @@ public class DefaultTransportService implements TransportService { private void checkInactivityAndReportActivity() { long expTime = System.currentTimeMillis() - sessionInactivityTimeout; sessions.forEach((uuid, sessionMD) -> { - if (sessionMD.getLastActivityTime() < expTime) { - if (log.isDebugEnabled()) { - log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime()); + long lastActivityTime = sessionMD.getLastActivityTime(); + TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo(); + if (sessionInfo.getGwSessionIdMSB() > 0 && + sessionInfo.getGwSessionIdLSB() > 0) { + SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB())); + if (gwMetaData != null) { + lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime); } - process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + } + if (lastActivityTime < expTime) { + if (log.isDebugEnabled()) { + log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime); + } + process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); sessions.remove(uuid); sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); } else { - if (sessionMD.getLastActivityTime() > sessionMD.getLastReportedActivityTime()) { - final long lastActivityTime = sessionMD.getLastActivityTime(); - process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder() + if (lastActivityTime > sessionMD.getLastReportedActivityTime()) { + final long lastActivityTimeFinal = lastActivityTime; + process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) .setRpcSubscription(sessionMD.isSubscribedToRPC()) - .setLastActivityTime(sessionMD.getLastActivityTime()).build(), new TransportServiceCallback() { + .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback() { @Override public void onSuccess(Void msg) { - sessionMD.setLastReportedActivityTime(lastActivityTime); + sessionMD.setLastReportedActivityTime(lastActivityTimeFinal); } @Override diff --git a/pom.xml b/pom.xml index 845d590faf..494a43cb56 100755 --- a/pom.xml +++ b/pom.xml @@ -305,6 +305,7 @@ **/.env + **/*.env **/.eslintrc **/.babelrc **/.jshintrc