Merge branch 'master' of github.com:thingsboard/thingsboard into develop/3.0

This commit is contained in:
Igor Kulikov 2020-04-28 18:48:44 +03:00
commit a4cd853a20
6 changed files with 42 additions and 28 deletions

View File

@ -49,6 +49,8 @@ message SessionInfoProto {
int64 deviceIdLSB = 7;
string deviceName = 8;
string deviceType = 9;
int64 gwSessionIdMSB = 10;
int64 gwSessionIdLSB = 11;
}
enum SessionEvent {

View File

@ -100,7 +100,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
private volatile DeviceSessionCtx deviceSessionCtx;
private volatile GatewaySessionHandler gatewaySessionHandler;
MqttTransportHandler(MqttTransportContext context,SslHandler sslHandler) {
MqttTransportHandler(MqttTransportContext context, SslHandler sslHandler) {
this.sessionId = UUID.randomUUID();
this.context = context;
this.transportService = context.getTransportService();
@ -138,32 +138,17 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
break;
case PUBLISH:
processPublish(ctx, (MqttPublishMessage) msg);
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.reportActivity();
}
break;
case SUBSCRIBE:
processSubscribe(ctx, (MqttSubscribeMessage) msg);
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.reportActivity();
}
break;
case UNSUBSCRIBE:
processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.reportActivity();
}
break;
case PINGREQ:
if (checkConnected(ctx, msg)) {
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
transportService.reportActivity(sessionInfo);
if (gatewaySessionHandler != null) {
gatewaySessionHandler.reportActivity();
}
}
break;
case DISCONNECT:
@ -174,7 +159,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
default:
break;
}
}
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
@ -188,6 +172,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
if (gatewaySessionHandler != null) {
handleGatewayPublishMsg(topicName, msgId, mqttMsg);
transportService.reportActivity(sessionInfo);
}
} else {
processDevicePublish(ctx, mqttMsg, topicName, msgId);
@ -244,6 +229,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
TransportProtos.ClaimDeviceMsg claimDeviceMsg = adaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
transportService.process(sessionInfo, claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
} else {
transportService.reportActivity(sessionInfo);
}
} catch (AdaptorException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
@ -276,6 +263,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
List<Integer> grantedQoSList = new ArrayList<>();
boolean activityReported = false;
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
@ -284,11 +272,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null);
registerSubQoS(topic, grantedQoSList, reqQoS);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null);
registerSubQoS(topic, grantedQoSList, reqQoS);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC:
@ -308,6 +298,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
grantedQoSList.add(FAILURE.value());
}
}
if (!activityReported) {
transportService.reportActivity(sessionInfo);
}
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList));
}
@ -320,6 +313,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (!checkConnected(ctx, mqttMsg)) {
return;
}
boolean activityReported = false;
log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId());
for (String topicName : mqttMsg.payload().topics()) {
mqttQoSMap.remove(new MqttTopicMatcher(topicName));
@ -327,10 +321,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: {
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
case MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC: {
transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), null);
activityReported = true;
break;
}
}
@ -338,6 +334,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName);
}
}
if (!activityReported) {
transportService.reportActivity(sessionInfo);
}
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId()));
}

View File

@ -46,6 +46,8 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext imple
.setTenantIdLSB(deviceInfo.getTenantIdLSB())
.setDeviceName(deviceInfo.getDeviceName())
.setDeviceType(deviceInfo.getDeviceType())
.setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
.setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
.build();
setDeviceInfo(deviceInfo);
}

View File

@ -410,7 +410,7 @@ public class GatewaySessionHandler {
return deviceSessionCtx.nextMsgId();
}
public void reportActivity() {
devices.forEach((id, deviceCtx) -> transportService.reportActivity(deviceCtx.getSessionInfo()));
public UUID getSessionId() {
return sessionId;
}
}

View File

@ -379,6 +379,7 @@ public class DefaultTransportService implements TransportService {
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setClaimDevice(msg).build(), callback);
}
@ -401,23 +402,32 @@ public class DefaultTransportService implements TransportService {
private void checkInactivityAndReportActivity() {
long expTime = System.currentTimeMillis() - sessionInactivityTimeout;
sessions.forEach((uuid, sessionMD) -> {
if (sessionMD.getLastActivityTime() < expTime) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionMD.getSessionInfo()), sessionMD.getLastActivityTime());
long lastActivityTime = sessionMD.getLastActivityTime();
TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
if (sessionInfo.getGwSessionIdMSB() > 0 &&
sessionInfo.getGwSessionIdLSB() > 0) {
SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()));
if (gwMetaData != null) {
lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
}
process(sessionMD.getSessionInfo(), getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
}
if (lastActivityTime < expTime) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime);
}
process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
sessions.remove(uuid);
sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
} else {
if (sessionMD.getLastActivityTime() > sessionMD.getLastReportedActivityTime()) {
final long lastActivityTime = sessionMD.getLastActivityTime();
process(sessionMD.getSessionInfo(), TransportProtos.SubscriptionInfoProto.newBuilder()
if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
final long lastActivityTimeFinal = lastActivityTime;
process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(sessionMD.isSubscribedToAttributes())
.setRpcSubscription(sessionMD.isSubscribedToRPC())
.setLastActivityTime(sessionMD.getLastActivityTime()).build(), new TransportServiceCallback<Void>() {
.setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void msg) {
sessionMD.setLastReportedActivityTime(lastActivityTime);
sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
}
@Override

View File

@ -305,6 +305,7 @@
</properties>
<excludes>
<exclude>**/.env</exclude>
<exclude>**/*.env</exclude>
<exclude>**/.eslintrc</exclude>
<exclude>**/.babelrc</exclude>
<exclude>**/.jshintrc</exclude>