From 03ca18b64be1eeba366ee386098604acabcef7c3 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 7 May 2024 17:12:59 +0300 Subject: [PATCH 1/2] Added MQTT reason codes from netty library --- .../device/DeviceActorMessageProcessor.java | 13 +- .../src/main/resources/thingsboard.yml | 2 + .../server/common/adaptor/JsonConverter.java | 7 + common/proto/src/main/proto/queue.proto | 13 ++ .../transport/mqtt/MqttTransportContext.java | 4 + .../transport/mqtt/MqttTransportHandler.java | 202 +++++++++++------- .../BackwardCompatibilityAdaptor.java | 8 +- .../mqtt/adaptors/JsonMqttAdaptor.java | 5 + .../mqtt/adaptors/MqttTransportAdaptor.java | 2 + .../mqtt/adaptors/ProtoMqttAdaptor.java | 9 + .../AbstractGatewaySessionHandler.java | 40 +++- .../transport/mqtt/util/ReturnCode.java | 104 --------- .../mqtt/util/ReturnCodeResolver.java | 30 +-- .../server/msa/AbstractContainerTest.java | 2 + .../msa/connectivity/MqttClientTest.java | 141 +++++++++++- .../thingsboard/mqtt/MqttChannelHandler.java | 23 +- .../thingsboard/mqtt/MqttClientCallback.java | 21 ++ .../org/thingsboard/mqtt/MqttClientImpl.java | 10 +- .../src/main/resources/tb-mqtt-transport.yml | 2 + 19 files changed, 426 insertions(+), 212 deletions(-) delete mode 100644 common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCode.java diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 253169a51d..7432038096 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -90,6 +90,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCre import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.gen.transport.TransportProtos.UplinkNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseReason; import org.thingsboard.server.service.rpc.RpcSubmitStrategy; import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -845,7 +846,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials()); }); } else { - sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!")); + sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!", SessionCloseReason.CREDENTIALS_UPDATED)); attributeSubscriptions.clear(); rpcSubscriptions.clear(); dumpSessions(); @@ -855,13 +856,15 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) { log.debug("remove eldest session (max concurrent sessions limit reached per device) sessionId: [{}] sessionMd: [{}]", sessionId, sessionMd); - notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!"); + notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!", SessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED); } - private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message) { + private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message, SessionCloseReason reason) { SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto .newBuilder() - .setMessage(message).build(); + .setMessage(message) + .setReason(reason) + .build(); ToTransportMsg msg = ToTransportMsg.newBuilder() .setSessionIdMSB(sessionId.getMostSignificantBits()) .setSessionIdLSB(sessionId.getLeastSignificantBits()) @@ -1044,7 +1047,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso attributeSubscriptions.remove(id); if (session != null) { removed++; - notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE); + notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE, SessionCloseReason.SESSION_TIMEOUT); } } if (removed != 0) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4c4fad1238..83b12f6c16 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -966,6 +966,8 @@ transport: proxy_enabled: "${MQTT_PROXY_PROTOCOL_ENABLED:false}" # MQTT processing timeout in milliseconds timeout: "${MQTT_TIMEOUT:10000}" + # MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message. + disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism netty: # Netty leak detector level diff --git a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index 7cfef02873..f5c7bbb901 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -466,6 +466,13 @@ public class JsonConverter { return result; } + public static JsonObject toGatewayDeviceDisconnectJson(String deviceName, int reasonCode) { + JsonObject result = new JsonObject(); + result.addProperty(DEVICE_PROPERTY, deviceName); + result.addProperty("reason", reasonCode); + return result; + } + public static JsonElement toErrorJson(String errorMsg) { JsonObject error = new JsonObject(); error.addProperty("error", errorMsg); diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 7b2aa2fe1b..c5c0957c18 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -395,6 +395,11 @@ message GetOrCreateDeviceFromGatewayResponseMsg { TransportApiRequestErrorCode error = 3; } +message GatewayDisconnectDeviceMsg { + string deviceName = 1; + int32 reasonCode = 2; +} + enum TransportApiRequestErrorCode { UNKNOWN_TRANSPORT_API_ERROR = 0; ENTITY_LIMIT = 1; @@ -575,8 +580,16 @@ message ResourceDeleteMsg { string resourceKey = 4; } +enum SessionCloseReason { + UNKNOWN_REASON = 0; + CREDENTIALS_UPDATED = 1; + MAX_CONCURRENT_SESSIONS_LIMIT_REACHED = 2; + SESSION_TIMEOUT = 3; +} + message SessionCloseNotificationProto { string message = 1; + SessionCloseReason reason = 2; } message SubscribeToAttributeUpdatesMsg { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java index e035b2a861..9556180300 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportContext.java @@ -71,6 +71,10 @@ public class MqttTransportContext extends TransportContext { @Value("${transport.mqtt.timeout:10000}") private long timeout; + @Getter + @Value("${transport.mqtt.disconnect_timeout:1000}") + private long disconnectTimeout; + @Getter @Value("${transport.mqtt.proxy_enabled:false}") private boolean proxyEnabled; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index b48cb39ca0..e99c42caa2 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -31,6 +31,7 @@ import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; @@ -68,7 +69,6 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; -import org.thingsboard.server.common.transport.service.DefaultTransportService; import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.util.SslUtil; import org.thingsboard.server.gen.transport.TransportProtos; @@ -82,7 +82,6 @@ import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler; import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher; import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler; -import org.thingsboard.server.transport.mqtt.util.ReturnCode; import org.thingsboard.server.transport.mqtt.util.ReturnCodeResolver; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugMessageType; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugRpcRequestHeader; @@ -196,23 +195,54 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement processMqttMsg(ctx, message); } else { log.error("[{}] Message decoding failed: {}", sessionId, message.decoderResult().cause().getMessage()); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.MALFORMED_PACKET); } } else { log.debug("[{}] Received non mqtt message: {}", sessionId, msg.getClass().getSimpleName()); - closeCtx(ctx); + closeCtx(ctx, (MqttMessage) null); } } finally { ReferenceCountUtil.safeRelease(msg); } } - private void closeCtx(ChannelHandlerContext ctx) { + private void closeCtx(ChannelHandlerContext ctx, MqttReasonCodes.Disconnect returnCode) { + closeCtx(ctx, returnCode.byteValue()); + } + + private void closeCtx(ChannelHandlerContext ctx, MqttConnectReturnCode returnCode) { + closeCtx(ctx, ReturnCodeResolver.getConnectionReturnCode(deviceSessionCtx.getMqttVersion(), returnCode).byteValue()); + } + + private void closeCtx(ChannelHandlerContext ctx, byte returnCode) { + closeCtx(ctx, createMqttDisconnectMsg(deviceSessionCtx, returnCode)); + } + + private void closeCtx(ChannelHandlerContext ctx, MqttMessage msg) { if (!rpcAwaitingAck.isEmpty()) { log.debug("[{}] Cleanup RPC awaiting ack map due to session close!", sessionId); rpcAwaitingAck.clear(); } - ctx.close(); + if (ctx.channel() != null && ctx.channel().isOpen()) { + if (msg != null && MqttVersion.MQTT_5 == deviceSessionCtx.getMqttVersion()) { + ChannelFuture channelFuture = ctx.writeAndFlush(msg).addListener(future -> ctx.close()); + scheduler.schedule(() -> { + if (!channelFuture.isDone()) { + log.debug("[{}] Closing channel due to timeout!", sessionId); + ctx.close(); + } + }, context.getDisconnectTimeout(), TimeUnit.MILLISECONDS); + } else { + ctx.close(); + } + } else { + if (ctx.channel() != null) { + log.debug("[{}] Channel is already closed!", sessionId); + } else { + log.debug("[{}] Channel is null, closing ctx...", sessionId); + ctx.close(); + } + } } InetSocketAddress getAddress(ChannelHandlerContext ctx) { @@ -231,7 +261,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { if (msg.fixedHeader() == null) { log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.PROTOCOL_ERROR); return; } deviceSessionCtx.setChannel(ctx); @@ -268,21 +298,23 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } else { log.debug("[{}] Unsupported topic for provisioning requests: {}!", sessionId, topicName); - closeCtx(ctx); + ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID); + closeCtx(ctx, MqttReasonCodes.Disconnect.TOPIC_NAME_INVALID); } } catch (RuntimeException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - closeCtx(ctx); + ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } catch (AdaptorException e) { log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - closeCtx(ctx); + sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); break; case DISCONNECT: - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.NORMAL_DISCONNECT); break; } } @@ -292,7 +324,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) { log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}", deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize()); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.QUOTA_EXCEEDED); return; } @@ -329,7 +361,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } break; case DISCONNECT: - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.NORMAL_DISCONNECT); break; case PUBACK: int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId(); @@ -389,15 +421,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement gatewaySessionHandler.onDeviceDisconnect(mqttMsg); break; default: - ack(ctx, msgId, ReturnCode.TOPIC_NAME_INVALID); + ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID); } } catch (RuntimeException e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); - closeCtx(ctx); + ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } catch (AdaptorException e) { log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - sendAckOrCloseSession(ctx, topicName, msgId); + sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } } @@ -433,11 +465,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } catch (RuntimeException e) { log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); - closeCtx(ctx); + ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } catch (AdaptorException | ThingsboardException | InvalidProtocolBufferException e) { log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - sendAckOrCloseSession(ctx, topicName, msgId); + sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } } @@ -530,11 +562,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement attrReqTopicType = TopicType.V2; } else { transportService.recordActivity(deviceSessionCtx.getSessionInfo()); - ack(ctx, msgId, ReturnCode.TOPIC_NAME_INVALID); + ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID); } } catch (AdaptorException e) { log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); - sendAckOrCloseSession(ctx, topicName, msgId); + sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } } @@ -548,13 +580,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void sendAckOrCloseSession(ChannelHandlerContext ctx, String topicName, int msgId) { + private void sendResponseForAdaptorErrorOrCloseContext(ChannelHandlerContext ctx, String topicName, int msgId) { if ((deviceSessionCtx.isSendAckOnValidationException() || MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) && msgId > 0) { log.debug("[{}] Send pub ack on invalid publish msg [{}][{}]", sessionId, topicName, msgId); - ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.PAYLOAD_FORMAT_INVALID)); + ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID.byteValue())); } else { log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID); } } @@ -593,7 +625,11 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - private void ack(ChannelHandlerContext ctx, int msgId, ReturnCode returnCode) { + private void ack(ChannelHandlerContext ctx, int msgId, MqttReasonCodes.PubAck returnCode) { + ack(ctx, msgId, returnCode.byteValue()); + } + + private void ack(ChannelHandlerContext ctx, int msgId, byte returnCode) { if (msgId > 0) { ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode)); } @@ -604,13 +640,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onSuccess(Void dummy) { log.trace("[{}] Published msg: {}", sessionId, msg); - ack(ctx, msgId, ReturnCode.SUCCESS); + ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); } @Override public void onError(Throwable e) { log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } }; } @@ -629,7 +665,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onSuccess(TransportProtos.ProvisionDeviceResponseMsg provisionResponseMsg) { log.trace("[{}] Published msg: {}", sessionId, msg); - ack(ctx, msgId, ReturnCode.SUCCESS); + ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); try { if (deviceSessionCtx.getProvisionPayloadType().equals(TransportPayloadType.JSON)) { deviceSessionCtx.getContext().getJsonMqttAdaptor().convertToPublish(deviceSessionCtx, provisionResponseMsg).ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); @@ -645,8 +681,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onError(Throwable e) { log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e); - ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); - closeCtx(ctx); + ack(ctx, msgId, MqttReasonCodes.PubAck.IMPLEMENTATION_SPECIFIC_ERROR); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } } @@ -681,13 +717,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onError(Throwable e) { log.trace("[{}] Failed to get firmware: {}", sessionId, msg, e); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } } private void sendOtaPackage(ChannelHandlerContext ctx, int msgId, String firmwareId, String requestId, int chunkSize, int chunk, OtaPackageType type) { log.trace("[{}] Send firmware [{}] to device!", sessionId, firmwareId); - ack(ctx, msgId, ReturnCode.SUCCESS); + ack(ctx, msgId, MqttReasonCodes.PubAck.SUCCESS); try { byte[] firmwareChunk = context.getOtaPackageDataCache().get(firmwareId, chunkSize, chunk); deviceSessionCtx.getPayloadAdaptor() @@ -703,13 +739,12 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement deviceSessionCtx.getChannel().writeAndFlush(deviceSessionCtx .getPayloadAdaptor() .createMqttPublishMsg(deviceSessionCtx, MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC, error.getBytes())); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { if (!checkConnected(ctx, mqttMsg)) { - int returnCode = ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.NOT_AUTHORIZED_5); - ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(returnCode))); + ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); return; } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); @@ -718,7 +753,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); - if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)){ + if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) { processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); activityReported = true; continue; @@ -789,13 +824,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement break; default: log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); - grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.TOPIC_FILTER_INVALID)); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); break; } } } catch (Exception e) { log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS, e); - grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), ReturnCode.IMPLEMENTATION_SPECIFIC)); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.IMPLEMENTATION_SPECIFIC_ERROR)); } } if (!activityReported) { @@ -832,7 +867,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { if (!checkConnected(ctx, mqttMsg)) { - ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(ReturnCode.NOT_AUTHORIZED_5.shortValue()))); + ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), + Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue()))); return; } boolean activityReported = false; @@ -843,7 +879,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement if (mqttQoSMap.containsKey(matcher)) { mqttQoSMap.remove(matcher); try { - short resultValue = ReturnCode.SUCCESS.shortValue(); + short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue(); switch (topicName) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: @@ -884,16 +920,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } default: log.trace("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); - resultValue = ReturnCode.TOPIC_FILTER_INVALID.shortValue(); + resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue(); } unSubResults.add(resultValue); } catch (Exception e) { log.debug("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); - unSubResults.add(ReturnCode.IMPLEMENTATION_SPECIFIC.shortValue()); + unSubResults.add((short) MqttReasonCodes.UnsubAck.IMPLEMENTATION_SPECIFIC_ERROR.byteValue()); } } else { log.debug("[{}] Failed to process unsubscription [{}] to [{}] - Subscription not found", sessionId, mqttMsg.variableHeader().messageId(), topicName); - unSubResults.add(ReturnCode.NO_SUBSCRIPTION_EXISTED.shortValue()); + unSubResults.add((short)MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue()); } } if (!activityReported) { @@ -918,7 +954,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version())); if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) { deviceSessionCtx.setProvisionOnly(true); - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED, msg)); } else { X509Certificate cert; if (sslHandler != null && (cert = getX509Certificate()) != null) { @@ -952,8 +988,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, userName, e); - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); - closeCtx(ctx); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage)); + closeCtx(ctx, MqttReasonCodes.Disconnect.SERVER_BUSY); } }); } @@ -975,15 +1011,15 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void onError(Throwable e) { log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); - closeCtx(ctx); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage)); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } }); } catch (Exception e) { context.onAuthFailure(address); - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.NOT_AUTHORIZED_5, connectMessage)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5, connectMessage)); log.trace("[{}] X509 auth failure: {}", sessionId, address, e); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.NOT_AUTHORIZED); } } @@ -1000,7 +1036,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement return null; } - private MqttConnAckMessage createMqttConnAckMsg(ReturnCode returnCode, MqttConnectMessage msg) { + private MqttConnAckMessage createMqttConnAckMsg(MqttConnectReturnCode returnCode, MqttConnectMessage msg) { MqttMessageBuilders.ConnAckBuilder connAckBuilder = MqttMessageBuilders.connAck(); connAckBuilder.sessionPresent(!msg.variableHeader().isCleanSession()); MqttConnectReturnCode finalReturnCode = ReturnCodeResolver.getConnectionReturnCode(deviceSessionCtx.getMqttVersion(), returnCode); @@ -1031,18 +1067,18 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement log.error("[{}] Unexpected Exception", sessionId, cause); } - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.SERVER_SHUTTING_DOWN); if (cause instanceof OutOfMemoryError) { log.error("Received critical error. Going to shutdown the service."); System.exit(1); } } - private static MqttSubAckMessage createSubAckMessage(Integer msgId, List grantedQoSList) { + private static MqttSubAckMessage createSubAckMessage(Integer msgId, List reasonCodes) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(SUBACK, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); - MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(grantedQoSList); + MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(reasonCodes); return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload); } @@ -1061,14 +1097,22 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } - public static MqttMessage createMqttPubAckMsg(DeviceSessionCtx deviceSessionCtx, int requestId, ReturnCode returnCode) { + public static MqttMessage createMqttPubAckMsg(DeviceSessionCtx deviceSessionCtx, int requestId, byte returnCode) { MqttMessageBuilders.PubAckBuilder pubAckMsgBuilder = MqttMessageBuilders.pubAck().packetId(requestId); if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) { - pubAckMsgBuilder.reasonCode(returnCode.byteValue()); + pubAckMsgBuilder.reasonCode(returnCode); } return pubAckMsgBuilder.build(); } + public static MqttMessage createMqttDisconnectMsg(DeviceSessionCtx deviceSessionCtx, byte returnCode) { + MqttMessageBuilders.DisconnectBuilder disconnectBuilder = MqttMessageBuilders.disconnect(); + if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) { + disconnectBuilder.reasonCode(returnCode); + } + return disconnectBuilder.build(); + } + private boolean checkConnected(ChannelHandlerContext ctx, MqttMessage msg) { if (deviceSessionCtx.isConnected()) { return true; @@ -1115,8 +1159,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } catch (Exception e) { log.trace("[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); - closeCtx(ctx); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage)); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } } @@ -1160,20 +1204,20 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void onValidateDeviceResponse(ValidateDeviceCredentialsResponse msg, ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { if (!msg.hasDeviceInfo()) { context.onAuthFailure(address); - ReturnCode returnCode = ReturnCode.NOT_AUTHORIZED_5; + MqttConnectReturnCode returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5; if (sslHandler == null || getX509Certificate() == null) { String username = connectMessage.payload().userName(); byte[] passwordBytes = connectMessage.payload().passwordInBytes(); String clientId = connectMessage.payload().clientIdentifier(); if ((username != null && passwordBytes != null && clientId != null) || (username == null ^ passwordBytes == null)) { - returnCode = ReturnCode.BAD_USERNAME_OR_PASSWORD; + returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD; } else if (!StringUtils.isBlank(clientId)) { - returnCode = ReturnCode.CLIENT_IDENTIFIER_NOT_VALID; + returnCode = MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID; } } ctx.writeAndFlush(createMqttConnAckMsg(returnCode, connectMessage)); - closeCtx(ctx); + closeCtx(ctx, returnCode); } else { context.onAuthSuccess(address); deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); @@ -1188,7 +1232,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else { checkGatewaySession(sessionMetaData); } - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, connectMessage)); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_ACCEPTED, connectMessage)); deviceSessionCtx.setConnected(true); log.debug("[{}] Client connected!", sessionId); transportService.getCallbackExecutor().execute(() -> processMsgQueue(ctx)); //this callback will execute in Producer worker thread and hard or blocking work have to be submitted to the separate thread. @@ -1198,11 +1242,13 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onError(Throwable e) { if (e instanceof TbRateLimitsException) { log.trace("[{}] Failed to submit session event: {}", sessionId, e.getMessage()); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED, connectMessage)); + closeCtx(ctx, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); } else { log.warn("[{}] Failed to submit session event", sessionId, e); + ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE_5, connectMessage)); + closeCtx(ctx, MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR); } - ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); - closeCtx(ctx); } }); } @@ -1231,8 +1277,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), SparkplugMessageType.NCMD); sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, - sparkplugTopic.toString(), - sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) + sparkplugTopic.toString(), + sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) .ifPresent(sparkplugSessionHandler::writeAndFlush); } }); @@ -1250,7 +1296,19 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); - closeCtx(deviceSessionCtx.getChannel()); + MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.IMPLEMENTATION_SPECIFIC_ERROR; + switch (sessionCloseNotification.getReason()) { + case CREDENTIALS_UPDATED: + returnCode = MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION; + break; + case MAX_CONCURRENT_SESSIONS_LIMIT_REACHED: + returnCode = MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER; + break; + case SESSION_TIMEOUT: + returnCode = MqttReasonCodes.Disconnect.MAXIMUM_CONNECT_TIME; + break; + } + closeCtx(deviceSessionCtx.getChannel(), returnCode); } @Override @@ -1287,8 +1345,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement SparkplugTopic sparkplugTopic = new SparkplugTopic(sparkplugSessionHandler.getSparkplugTopicNode(), messageType); sparkplugSessionHandler.createSparkplugMqttPublishMsg(tsKvProto, - sparkplugTopic.toString(), - sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) + sparkplugTopic.toString(), + sparkplugSessionHandler.getNodeBirthMetrics().get(tsKvProto.getKv().getKey())) .ifPresent(payload -> sendToDeviceRpcRequest(payload, rpcRequest, deviceSessionCtx.getSessionInfo())); } else { sendErrorRpcResponse(deviceSessionCtx.getSessionInfo(), rpcRequest.getRequestId(), @@ -1369,7 +1427,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void onDeviceDeleted(DeviceId deviceId) { context.onAuthFailure(address); ChannelHandlerContext ctx = deviceSessionCtx.getChannel(); - closeCtx(ctx); + closeCtx(ctx, MqttReasonCodes.Disconnect.ADMINISTRATIVE_ACTION); } public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) { diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java index e85decf419..c4cfd13dba 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java @@ -20,8 +20,8 @@ import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext; @@ -146,6 +146,12 @@ public class BackwardCompatibilityAdaptor implements MqttTransportAdaptor { return Optional.empty(); } + @Override + public Optional convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) throws AdaptorException { + log.warn("[{}] invoked not implemented adaptor method! Device name: {} ReasonCode: {}", ctx.getSessionId(), deviceName, reasonCode); + return Optional.empty(); + } + @Override public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException { return protoAdaptor.convertToPublish(ctx, firmwareChunk, requestId, chunk, firmwareType); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 90174616e6..8cd215f1f1 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -154,6 +154,11 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor { return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, JsonConverter.toJson(provisionResponse))); } + @Override + public Optional convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) { + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_DISCONNECT_TOPIC, JsonConverter.toGatewayDeviceDisconnectJson(deviceName, reasonCode))); + } + @Override public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) { return Optional.of(createMqttPublishMsg(ctx, String.format(DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT, firmwareType.getKeyPrefix(), requestId, chunk), firmwareChunk)); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 34d80411e5..9cf15f9de1 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -80,6 +80,8 @@ public interface MqttTransportAdaptor { Optional convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException; + Optional convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) throws AdaptorException; + default MqttPublishMessage createMqttPublishMsg(MqttDeviceAwareSessionContext ctx, String topic, byte[] payloadInBytes) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, ctx.getQoSForTopic(topic), false, 0); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java index 25113dedca..2fc75df026 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java @@ -173,6 +173,15 @@ public class ProtoMqttAdaptor implements MqttTransportAdaptor { return Optional.of(createMqttPublishMsg(ctx, MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC, provisionResponse.toByteArray())); } + @Override + public Optional convertToGatewayDeviceDisconnectPublish(MqttDeviceAwareSessionContext ctx, String deviceName, int reasonCode) { + TransportProtos.GatewayDisconnectDeviceMsg gatewayDeviceDisconnectMsg = TransportProtos.GatewayDisconnectDeviceMsg.newBuilder() + .setDeviceName(deviceName) + .setReasonCode(reasonCode) + .build(); + return Optional.of(createMqttPublishMsg(ctx, MqttTopics.GATEWAY_DISCONNECT_TOPIC, gatewayDeviceDisconnectMsg.toByteArray())); + } + @Override public Optional convertToPublish(MqttDeviceAwareSessionContext ctx, byte[] firmwareChunk, String requestId, int chunk, OtaPackageType firmwareType) throws AdaptorException { return Optional.of(createMqttPublishMsg(ctx, String.format(DEVICE_SOFTWARE_FIRMWARE_RESPONSES_TOPIC_FORMAT, firmwareType.getKeyPrefix(), requestId, chunk), firmwareChunk)); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index d033774e93..c479383891 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -33,6 +33,9 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttReasonCodes; +import io.netty.handler.codec.mqtt.MqttVersion; +import jakarta.annotation.Nullable; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -46,6 +49,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; @@ -59,10 +63,8 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; -import org.thingsboard.server.transport.mqtt.util.ReturnCode; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; -import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -227,7 +229,7 @@ public abstract class AbstractGatewaySessionHandler() { @Override public void onSuccess(@Nullable T result) { - ack(msg, ReturnCode.SUCCESS); + ack(msg, MqttReasonCodes.PubAck.SUCCESS); log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName); } @@ -363,7 +365,7 @@ public abstract class AbstractGatewaySessionHandler 0) { - writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode)); + writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, returnCode.byteValue())); } } @@ -786,16 +788,38 @@ public abstract class AbstractGatewaySessionHandler 0) { - ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.SUCCESS)); + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.SUCCESS.byteValue())); + } else { + log.trace("[{}][{}][{}] Wrong msg id: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg); + ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, MqttReasonCodes.PubAck.UNSPECIFIED_ERROR.byteValue())); + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MALFORMED_PACKET); } } @Override public void onError(Throwable e) { log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e); + if (e instanceof TbRateLimitsException) { + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.MESSAGE_RATE_TOO_HIGH); + } else { + closeDeviceSession(deviceName, MqttReasonCodes.Disconnect.UNSPECIFIED_ERROR); + } ctx.close(); } }; } + + private void closeDeviceSession(String deviceName, MqttReasonCodes.Disconnect returnCode) { + try { + if (MqttVersion.MQTT_5.equals(deviceSessionCtx.getMqttVersion())) { + MqttTransportAdaptor adaptor = deviceSessionCtx.getPayloadAdaptor(); + int returnCodeValue = returnCode.byteValue() & 0xFF; + Optional deviceDisconnectPublishMsg = adaptor.convertToGatewayDeviceDisconnectPublish(deviceSessionCtx, deviceName, returnCodeValue); + deviceDisconnectPublishMsg.ifPresent(deviceSessionCtx.getChannel()::writeAndFlush); + } + } catch (Exception e) { + log.trace("Failed to send device disconnect to gateway session", e); + } + } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCode.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCode.java deleted file mode 100644 index 14aa799ab2..0000000000 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCode.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.transport.mqtt.util; - -public enum ReturnCode { - SUCCESS((byte) 0x00), - //MQTT 3 codes - UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01), - IDENTIFIER_REJECTED((byte) 0x02), - SERVER_UNAVAILABLE((byte) 0x03), - BAD_USER_NAME_OR_PASSWORD((byte) 0x04), - NOT_AUTHORIZED((byte) 0x05), - //MQTT 5 codes - NO_MATCHING_SUBSCRIBERS((byte) 0x10), - NO_SUBSCRIPTION_EXISTED((byte) 0x11), - CONTINUE_AUTHENTICATION((byte) 0x18), - REAUTHENTICATE((byte) 0x19), - UNSPECIFIED_ERROR((byte) 0x80), - MALFORMED_PACKET((byte) 0x81), - PROTOCOL_ERROR((byte) 0x82), - IMPLEMENTATION_SPECIFIC((byte) 0x83), - UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84), - CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85), - BAD_USERNAME_OR_PASSWORD((byte) 0x86), - NOT_AUTHORIZED_5((byte) 0x87), - SERVER_UNAVAILABLE_5((byte) 0x88), - SERVER_BUSY((byte) 0x89), - BANNED((byte) 0x8A), - SERVER_SHUTTING_DOWN((byte) 0x8B), - BAD_AUTHENTICATION_METHOD((byte) 0x8C), - KEEP_ALIVE_TIMEOUT((byte) 0x8D), - SESSION_TAKEN_OVER((byte) 0x8E), - TOPIC_FILTER_INVALID((byte) 0x8F), - TOPIC_NAME_INVALID((byte) 0x90), - PACKET_IDENTIFIER_IN_USE((byte) 0x91), - PACKET_IDENTIFIER_NOT_FOUND((byte) 0x92), - RECEIVE_MAXIMUM_EXCEEDED((byte) 0x93), - TOPIC_ALIAS_INVALID((byte) 0x94), - PACKET_TOO_LARGE((byte) 0x95), - MESSAGE_RATE_TOO_HIGH((byte) 0x96), - QUOTA_EXCEEDED((byte) 0x97), - ADMINISTRATIVE_ACTION((byte) 0x98), - PAYLOAD_FORMAT_INVALID((byte) 0x99), - RETAIN_NOT_SUPPORTED((byte) 0x9A), - QOS_NOT_SUPPORTED((byte) 0x9B), - USE_ANOTHER_SERVER((byte) 0x9C), - SERVER_MOVED((byte) 0x9D), - SHARED_SUBSCRIPTION_NOT_SUPPORTED((byte) 0x9E), - CONNECTION_RATE_EXCEEDED((byte) 0x9F), - MAXIMUM_CONNECT_TIME((byte) 0xA0), - SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED((byte) 0xA1), - WILDCARD_SUBSCRIPTION_NOT_SUPPORTED((byte) 0xA2); - - private static final ReturnCode[] VALUES; - - static { - ReturnCode[] values = values(); - VALUES = new ReturnCode[163]; - for (ReturnCode code : values) { - final int unsignedByte = code.byteValue & 0xFF; - // Suppress a warning about out of bounds access since the enum contains only correct values - VALUES[unsignedByte] = code; // lgtm [java/index-out-of-bounds] - } - } - - private final byte byteValue; - - ReturnCode(byte byteValue) { - this.byteValue = byteValue; - } - - public byte byteValue() { - return byteValue; - } - - public short shortValue(){return byteValue;} - - public static ReturnCode valueOf(byte b) { - final int unsignedByte = b & 0xFF; - ReturnCode mqttConnectReturnCode = null; - try { - mqttConnectReturnCode = VALUES[unsignedByte]; - } catch (ArrayIndexOutOfBoundsException ignored) { - // no op - } - if (mqttConnectReturnCode == null) { - throw new IllegalArgumentException("unknown connect return code: " + unsignedByte); - } - return mqttConnectReturnCode; - } -} \ No newline at end of file diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCodeResolver.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCodeResolver.java index f2e6097222..f411836650 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCodeResolver.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/ReturnCodeResolver.java @@ -17,22 +17,24 @@ package org.thingsboard.server.transport.mqtt.util; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttVersion; import lombok.extern.slf4j.Slf4j; @Slf4j public class ReturnCodeResolver { - public static MqttConnectReturnCode getConnectionReturnCode(MqttVersion mqttVersion, ReturnCode returnCode) { - if (!MqttVersion.MQTT_5.equals(mqttVersion) && !ReturnCode.SUCCESS.equals(returnCode)) { + public static MqttConnectReturnCode getConnectionReturnCode(MqttVersion mqttVersion, MqttConnectReturnCode returnCode) { + if (!MqttVersion.MQTT_5.equals(mqttVersion) && !MqttConnectReturnCode.CONNECTION_ACCEPTED.equals(returnCode)) { switch (returnCode) { - case BAD_USERNAME_OR_PASSWORD: - case NOT_AUTHORIZED_5: + case CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD: + case CONNECTION_REFUSED_NOT_AUTHORIZED_5: return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; - case SERVER_UNAVAILABLE_5: - return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; - case CLIENT_IDENTIFIER_NOT_VALID: + case CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID: return MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED; + case CONNECTION_REFUSED_SERVER_UNAVAILABLE_5: + case CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED: + return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; default: log.warn("Unknown return code for conversion: {}", returnCode.name()); } @@ -40,18 +42,20 @@ public class ReturnCodeResolver { return MqttConnectReturnCode.valueOf(returnCode.byteValue()); } - public static int getSubscriptionReturnCode(MqttVersion mqttVersion, ReturnCode returnCode) { - if (!MqttVersion.MQTT_5.equals(mqttVersion) && !ReturnCode.SUCCESS.equals(returnCode)) { + public static int getSubscriptionReturnCode(MqttVersion mqttVersion, MqttReasonCodes.SubAck returnCode) { + if (!MqttVersion.MQTT_5.equals(mqttVersion) && !(MqttReasonCodes.SubAck.GRANTED_QOS_0.equals(returnCode) || + MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(returnCode) || + MqttReasonCodes.SubAck.GRANTED_QOS_2.equals(returnCode))) { switch (returnCode) { case UNSPECIFIED_ERROR: case TOPIC_FILTER_INVALID: - case IMPLEMENTATION_SPECIFIC: - case NOT_AUTHORIZED_5: + case IMPLEMENTATION_SPECIFIC_ERROR: + case NOT_AUTHORIZED: case PACKET_IDENTIFIER_IN_USE: case QUOTA_EXCEEDED: - case SHARED_SUBSCRIPTION_NOT_SUPPORTED: + case SHARED_SUBSCRIPTIONS_NOT_SUPPORTED: case SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED: - case WILDCARD_SUBSCRIPTION_NOT_SUPPORTED: + case WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED: return MqttQoS.FAILURE.value(); } } diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java index e98f445e5d..ad6e50a62c 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractContainerTest.java @@ -45,6 +45,8 @@ import java.util.Random; @Listeners(TestListener.class) public abstract class AbstractContainerTest { + protected static final int TIMEOUT = 30; + protected final static String TEST_PROVISION_DEVICE_KEY = "test_provision_key"; protected final static String TEST_PROVISION_DEVICE_SECRET = "test_provision_secret"; protected static long timeoutMultiplier = 1; diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index c2ab3bd02f..bc033364e7 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -23,7 +23,12 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; +import io.netty.handler.codec.mqtt.MqttReasonCodes; +import io.netty.handler.codec.mqtt.MqttVersion; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; @@ -33,9 +38,9 @@ import org.testng.annotations.Test; import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.mqtt.MqttClient; +import org.thingsboard.mqtt.MqttClientCallback; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttHandler; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; @@ -56,7 +61,9 @@ import org.thingsboard.server.msa.mapper.WsTelemetryResponse; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Random; @@ -65,6 +72,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.fail; @@ -76,6 +84,9 @@ import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevi @Slf4j public class MqttClientTest extends AbstractContainerTest { + private static final String TRANSPORT_HOST = "localhost"; + private static final int TRANSPORT_PORT = 1883; + private Device device; AbstractListeningExecutor handlerExecutor; @@ -100,6 +111,7 @@ public class MqttClientTest extends AbstractContainerTest { handlerExecutor.destroy(); } } + @Test public void telemetryUpload() throws Exception { DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); @@ -194,7 +206,7 @@ public class MqttClientTest extends AbstractContainerTest { String sharedAttributeValue = StringUtils.randomAlphanumeric(8); sharedAttributes.addProperty("sharedAttr", sharedAttributeValue); JsonNode sharedAttribute = mapper.readTree(sharedAttributes.toString()); - testRestClient.postTelemetryAttribute(DataConstants.DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute); + testRestClient.postTelemetryAttribute(DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute); // Subscribe to attributes response mqttClient.on("v1/devices/me/attributes/response/+", listener, MqttQoS.AT_LEAST_ONCE).get(); @@ -237,7 +249,7 @@ public class MqttClientTest extends AbstractContainerTest { sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue); JsonNode sharedAttribute = mapper.readTree(sharedAttributes.toString()); - testRestClient.postTelemetryAttribute(DataConstants.DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute); + testRestClient.postTelemetryAttribute(DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute); MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); assertThat(mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()) @@ -317,8 +329,8 @@ public class MqttClientTest extends AbstractContainerTest { mqttClient.publish("v1/devices/me/rpc/request/" + requestId, Unpooled.wrappedBuffer(clientRequest.toString().getBytes())).get(); // Check the response from the server - TimeUnit.SECONDS.sleep(1 * timeoutMultiplier); - MqttEvent responseFromServer = listener.getEvents().poll(1 * timeoutMultiplier, TimeUnit.SECONDS); + TimeUnit.SECONDS.sleep(3 * timeoutMultiplier); + MqttEvent responseFromServer = listener.getEvents().poll(3 * timeoutMultiplier, TimeUnit.SECONDS); Integer responseId = Integer.valueOf(Objects.requireNonNull(responseFromServer).getTopic().substring("v1/devices/me/rpc/response/".length())); assertThat(responseId).isEqualTo(requestId); assertThat(mapper.readTree(responseFromServer.getMessage()).get("response").asText()).isEqualTo("requestReceived"); @@ -444,6 +456,84 @@ public class MqttClientTest extends AbstractContainerTest { assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND"); } + @Test + public void regularDisconnect() throws Exception { + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5); + final List returnCodeByteValue = new ArrayList<>(); + MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue); + mqttClient.setCallback(callbackForDisconnectWithReturnCode); + mqttClient.disconnect(); + Thread.sleep(1000); + assertThat(returnCodeByteValue.size()).isEqualTo(1); + MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0)); + assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.NORMAL_DISCONNECT); + } + + @Test + public void clientSessionTakenOverDisconnect() throws Exception { + DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId()); + + MqttMessageListener listener = new MqttMessageListener(); + MqttClient mqttClient = getMqttClient(deviceCredentials, listener, MqttVersion.MQTT_5); + final List returnCodeByteValue = new ArrayList<>(); + MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue); + mqttClient.setCallback(callbackForDisconnectWithReturnCode); + + Thread.sleep(1000); + + MqttMessageListener dummyListener = new MqttMessageListener(); + MqttClient dummyMqttClient = getMqttClient(deviceCredentials, dummyListener, MqttVersion.MQTT_5); + final List returnCodeByteValueSecondClient = new ArrayList<>(); + MqttClientCallback callbackForDisconnectWithReturnCodeDummy = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValueSecondClient); + dummyMqttClient.setCallback(callbackForDisconnectWithReturnCodeDummy); + + Awaitility + .await() + .alias("Check device disconnect.") + .atMost(TIMEOUT*timeoutMultiplier, TimeUnit.SECONDS) + .until(() -> returnCodeByteValue.size() > 0); + + assertThat(returnCodeByteValueSecondClient).isEmpty(); + assertThat(returnCodeByteValue).isNotEmpty(); + + MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0)); + + dummyMqttClient.disconnect(); + + assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.SESSION_TAKEN_OVER); + } + + @Test + public void clientPublishForRegularTopicByProvisionClient() throws Exception { + MqttClient mqttClient = getMqttClient("provision", new MqttMessageListener(), MqttVersion.MQTT_5); + final List returnCodeByteValue = new ArrayList<>(); + MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue); + mqttClient.setCallback(callbackForDisconnectWithReturnCode); + mqttClient.publish("v1/devices/me/telemetry", Unpooled.wrappedBuffer("test".getBytes()), MqttQoS.AT_LEAST_ONCE).get(); + Thread.sleep(1000); + assertThat(returnCodeByteValue).isNotEmpty(); + MqttReasonCodes.Disconnect returnCode = MqttReasonCodes.Disconnect.valueOf(returnCodeByteValue.get(0)); + assertThat(returnCode).isEqualTo(MqttReasonCodes.Disconnect.TOPIC_NAME_INVALID); + } + + @Test + public void clientConnectWithBadCredentials() throws Exception { + MqttClient mqttClient = getMqttClient("unknownAccessToken", new MqttMessageListener(), MqttVersion.MQTT_5, false); + final List returnCodeByteValue = new ArrayList<>(); + MqttClientCallback callbackForDisconnectWithReturnCode = getCallbackWrapperForDisconnectWithReturnCode(returnCodeByteValue); + mqttClient.setCallback(callbackForDisconnectWithReturnCode); + try { + mqttClient.connect(TRANSPORT_HOST, TRANSPORT_PORT).get(1, TimeUnit.SECONDS); + } catch (TimeoutException ignored) { + } + assertThat(returnCodeByteValue).isNotEmpty(); + MqttConnectReturnCode returnCode = MqttConnectReturnCode.valueOf(returnCodeByteValue.get(0)); + assertThat(returnCode).isIn(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); + } + private RuleChainId createRootRuleChainForRpcResponse() throws Exception { RuleChain newRuleChain = new RuleChain(); newRuleChain.setName("testRuleChain"); @@ -477,8 +567,34 @@ public class MqttClientTest extends AbstractContainerTest { return defaultRuleChain.get().getId(); } + private MqttClientCallback getCallbackWrapperForDisconnectWithReturnCode(List returnCodeByteValueWrapper) { + return new MqttClientCallback() { + @Override + public void connectionLost(Throwable cause) { + } + + @Override + public void onSuccessfulReconnect() { + } + + @Override + public void onDisconnect(MqttMessage mqttDisconnectMessage) { + log.info("Disconnected with reason: {}", mqttDisconnectMessage); + returnCodeByteValueWrapper.add(((MqttReasonCodeAndPropertiesVariableHeader) mqttDisconnectMessage.variableHeader()).reasonCode()); + } + }; + } + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException { - return getMqttClient(deviceCredentials.getCredentialsId(), listener); + return getMqttClient(deviceCredentials.getCredentialsId(), listener, MqttVersion.MQTT_3_1_1, true); + } + + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener, MqttVersion mqttVersion) throws InterruptedException, ExecutionException { + return getMqttClient(deviceCredentials.getCredentialsId(), listener, mqttVersion, true); + } + + private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener, MqttVersion mqttVersion, boolean connect) throws InterruptedException, ExecutionException { + return getMqttClient(deviceCredentials.getCredentialsId(), listener, mqttVersion, connect); } private String getOwnerId() { @@ -486,12 +602,23 @@ public class MqttClientTest extends AbstractContainerTest { } private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException { + return getMqttClient(username, listener, MqttVersion.MQTT_3_1_1, true); + } + + private MqttClient getMqttClient(String username, MqttMessageListener listener, MqttVersion mqttVersion) throws InterruptedException, ExecutionException { + return getMqttClient(username, listener, mqttVersion, true); + } + + private MqttClient getMqttClient(String username, MqttMessageListener listener, MqttVersion mqttVersion, boolean connect) throws InterruptedException, ExecutionException { MqttClientConfig clientConfig = new MqttClientConfig(); clientConfig.setOwnerId(getOwnerId()); clientConfig.setClientId("MQTT client from test"); clientConfig.setUsername(username); + clientConfig.setProtocolVersion(mqttVersion); MqttClient mqttClient = MqttClient.create(clientConfig, listener, handlerExecutor); - mqttClient.connect("localhost", 1883).get(); + if (connect) { + mqttClient.connect(TRANSPORT_HOST, TRANSPORT_PORT).get(); + } return mqttClient; } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java index c7ff2746bd..f14d9afa5a 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttChannelHandler.java @@ -84,6 +84,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler case PUBCOMP: handlePubcomp(msg); break; + case DISCONNECT: + handleDisconnect(msg); + break; } } else { log.error("[{}] Message decoding failed: {}", client.getClientConfig().getClientId(), msg.decoderResult().cause().getMessage()); @@ -204,6 +207,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler // Don't start reconnect logic here break; } + if (this.client.getCallback() != null) { + this.client.getCallback().onConnAck(message); + } } private void handleSubAck(MqttSubAckMessage message) { @@ -224,6 +230,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler if (!pendingSubscription.getFuture().isDone()) { pendingSubscription.getFuture().setSuccess(null); } + if (this.client.getCallback() != null) { + this.client.getCallback().onSubAck(message); + } } private void handlePublish(Channel channel, MqttPublishMessage message) { @@ -267,6 +276,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler this.client.getServerSubscriptions().remove(unsubscription.getTopic()); unsubscription.getFuture().setSuccess(null); this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId()); + if (this.client.getCallback() != null) { + this.client.getCallback().onUnsubAck(message); + } } private void handlePuback(MqttPubAckMessage message) { @@ -278,6 +290,9 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler pendingPublish.onPubackReceived(); this.client.getPendingPublishes().remove(message.variableHeader().messageId()); pendingPublish.getPayload().release(); + if (this.client.getCallback() != null) { + this.client.getCallback().onPubAck(message); + } } private void handlePubrec(Channel channel, MqttMessage message) { @@ -301,7 +316,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler future = Futures.transform(future, x -> { this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId()); return null; - }, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } future.addListener(() -> { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0); @@ -319,6 +334,12 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler pendingPublish.onPubcompReceived(); } + private void handleDisconnect(MqttMessage message) { + if (this.client.getCallback() != null) { + this.client.getCallback().onDisconnect(message); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { try { diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java index 42acc05f42..f7963ad4b1 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientCallback.java @@ -15,6 +15,12 @@ */ package org.thingsboard.mqtt; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttSubAckMessage; +import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; + /** * Created by Valerii Sosliuk on 12/30/2017. */ @@ -32,4 +38,19 @@ public interface MqttClientCallback { * */ void onSuccessfulReconnect(); + + default void onConnAck(MqttConnAckMessage connAckMessage) { + } + + default void onPubAck(MqttPubAckMessage pubAckMessage) { + } + + default void onSubAck(MqttSubAckMessage pubAckMessage) { + } + + default void onUnsubAck(MqttUnsubAckMessage unsubAckMessage) { + } + + default void onDisconnect(MqttMessage mqttDisconnectMessage) { + } } diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index 6781bb171a..cdf70a05ea 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -45,6 +45,8 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ListeningExecutor; @@ -87,6 +89,7 @@ final class MqttClientImpl implements MqttClient { private volatile boolean reconnect = false; private String host; private int port; + @Getter private MqttClientCallback callback; private final ListeningExecutor handlerExecutor; @@ -426,7 +429,12 @@ final class MqttClientImpl implements MqttClient { disconnected = true; if (this.channel != null) { MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0)); - this.sendAndFlushPacket(message).addListener(future1 -> channel.close()); + ChannelFuture channelFuture = this.sendAndFlushPacket(message); + eventLoop.schedule(() -> { + if (!channelFuture.isDone()) { + this.channel.close(); + } + }, 500, TimeUnit.MILLISECONDS); } } diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index d210138852..bfb144c210 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -143,6 +143,8 @@ transport: proxy_enabled: "${MQTT_PROXY_PROTOCOL_ENABLED:false}" # MQTT processing timeout in milliseconds timeout: "${MQTT_TIMEOUT:10000}" + # MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message. + disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism netty: # Netty leak detector level From 764de2c8d69e4098d62946e5f10597cebdeccb16 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 7 May 2024 18:12:42 +0300 Subject: [PATCH 2/2] Imports and refactoring --- .../thingsboard/server/msa/connectivity/MqttClientTest.java | 3 ++- .../src/main/java/org/thingsboard/mqtt/MqttClientImpl.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java index bc033364e7..55198accce 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/MqttClientTest.java @@ -41,6 +41,7 @@ import org.thingsboard.mqtt.MqttClient; import org.thingsboard.mqtt.MqttClientCallback; import org.thingsboard.mqtt.MqttClientConfig; import org.thingsboard.mqtt.MqttHandler; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; @@ -249,7 +250,7 @@ public class MqttClientTest extends AbstractContainerTest { sharedAttributes.addProperty(sharedAttributeName, sharedAttributeValue); JsonNode sharedAttribute = mapper.readTree(sharedAttributes.toString()); - testRestClient.postTelemetryAttribute(DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute); + testRestClient.postTelemetryAttribute(DataConstants.DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute); MqttEvent event = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS); assertThat(mapper.readValue(Objects.requireNonNull(event).getMessage(), JsonNode.class).get(sharedAttributeName).asText()) diff --git a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java index cdf70a05ea..86ce95e350 100644 --- a/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java +++ b/netty-mqtt/src/main/java/org/thingsboard/mqtt/MqttClientImpl.java @@ -46,7 +46,6 @@ import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ListeningExecutor;